From d5ce32c67267eca4e11fe21753bcd97f85e60d8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannes=20H=C3=B6ke?= Date: Wed, 14 Dec 2016 17:15:52 +0100 Subject: [PATCH] removed Job.run_immediately and related code --- telegram/ext/jobqueue.py | 231 ++++++++------------------------------- tests/test_jobqueue.py | 93 ---------------- 2 files changed, 45 insertions(+), 279 deletions(-) diff --git a/telegram/ext/jobqueue.py b/telegram/ext/jobqueue.py index 76368389a..fad30d647 100644 --- a/telegram/ext/jobqueue.py +++ b/telegram/ext/jobqueue.py @@ -23,11 +23,9 @@ import time import warnings import datetime from numbers import Number -from threading import Thread, Lock, RLock, Event +from threading import Thread, Lock, Event from queue import PriorityQueue, Empty -from telegram.utils.promise import Promise - class Days(object): MON, TUE, WED, THU, FRI, SAT, SUN = range(7) @@ -58,7 +56,6 @@ class JobQueue(object): self.logger = logging.getLogger(self.__class__.__name__) self.__start_lock = Lock() self.__next_peek_lock = Lock() # to protect self._next_peek & self.__tick - self.__queue_lock = RLock() # to protect self.queue self.__tick = Event() self.__thread = None """:type: Thread""" @@ -132,8 +129,7 @@ class JobQueue(object): self.logger.debug('Putting job %s with t=%f', job.name, next_t) - with self.__queue_lock: - self.queue.put((next_t, job)) + self.queue.put((next_t, job)) # Wake up the loop if this job should be executed next self._set_next_peek(next_t) @@ -227,47 +223,6 @@ class JobQueue(object): self._put(job, next_t=time) return job - def update_job_due_time(self, job, due): - """ - Changes the due time of a job. - - Args: - job (Job): The job of which the due time should be changed. Must already exist in the - queue. - due (float): The new due time as a timestamp - - Returns: - float: The old due time of the job as a timestamp - """ - - with self.__queue_lock: - cache = list() - # Pop jobs from the priority queue one by one and check if it's the one we need. - # All other jobs are stashed away in the 'cache' list. Once the right job is found, - # it's put back into the queue with the new due time, together with all cached jobs. - try: - while True: - due_test, job_test = self.queue.get(block=False) - - if job_test is job: - self.queue.put((due, job_test)) - old_due = due_test - break - - else: - cache.append((due_test, job_test)) - - except Empty: - raise ValueError("Specified 'job' doesn't exist in the job queue") - - finally: - for item in reversed(cache): - self.queue.put(item) - - # Make sure the queue wakes up if the updated due time requires it - self._set_next_peek(due) - return old_due - def _set_next_peek(self, t): """ Set next peek if not defined or `t` is before next peek. @@ -283,56 +238,55 @@ class JobQueue(object): Run all jobs that are due and re-enqueue them with their interval. """ - with self.__queue_lock: - now = time.time() + now = time.time() - self.logger.debug('Ticking jobs with t=%f', now) + self.logger.debug('Ticking jobs with t=%f', now) - while True: + while True: + try: + t, job = self.queue.get(False) + + except Empty: + break + + self.logger.debug('Peeked at %s with t=%f', job.name, t) + + if t > now: + # We can get here in two conditions: + # 1. At the second or later pass of the while loop, after we've already + # processed the job(s) we were supposed to at this time. + # 2. At the first iteration of the loop only if `self.put()` had triggered + # `self.__tick` because `self._next_peek` wasn't set + self.logger.debug("Next task isn't due yet. Finished!") + self.queue.put((t, job)) + self._set_next_peek(t) + break + + if job._remove.is_set(): + self.logger.debug('Removing job %s', job.name) + job.job_queue = None + continue + + if job.enabled: try: - t, job = self.queue.get(False) + current_week_day = datetime.datetime.now().weekday() + if any(day == current_week_day for day in job.days): + self.logger.debug('Running job %s', job.name) + job.run(self.bot) - except Empty: - break + except: + self.logger.exception('An uncaught error was raised while executing job %s', + job.name) - self.logger.debug('Peeked at %s with t=%f', job.name, t) + else: + self.logger.debug('Skipping disabled job %s', job.name) - if t > now: - # We can get here in two conditions: - # 1. At the second or later pass of the while loop, after we've already - # processed the job(s) we were supposed to at this time. - # 2. At the first iteration of the loop only if `self.put()` had triggered - # `self.__tick` because `self._next_peek` wasn't set - self.logger.debug("Next task isn't due yet. Finished!") - self.queue.put((t, job)) - self._set_next_peek(t) - break + if job.repeat: + self._put(job, last_t=t) - if job._remove.is_set(): - self.logger.debug('Removing job %s', job.name) - job.job_queue = None - continue - - if job.enabled: - try: - current_week_day = datetime.datetime.now().weekday() - if any(day == current_week_day for day in job.days): - self.logger.debug('Running job %s', job.name) - job.run(self.bot) - - except: - self.logger.exception( - 'An uncaught error was raised while executing job %s', job.name) - - else: - self.logger.debug('Skipping disabled job %s', job.name) - - if job.repeat: - self._put(job, last_t=t) - - else: - self.logger.debug('Dropping non-repeating job %s', job.name) - job.job_queue = None + else: + self.logger.debug('Dropping non-repeating job %s', job.name) + job.job_queue = None def start(self): """ @@ -387,8 +341,7 @@ class JobQueue(object): def jobs(self): """Returns a tuple of all jobs that are currently in the ``JobQueue``""" - with self.__queue_lock: - return tuple(job[1] for job in self.queue.queue if job) + return tuple(job[1] for job in self.queue.queue if job) class Job(object): @@ -446,7 +399,6 @@ class Job(object): self._remove = Event() self._enabled = Event() self._enabled.set() - self._immediate_run_in_progress = Event() def run(self, bot): """Executes the callback function""" @@ -459,99 +411,6 @@ class Job(object): """ self._remove.set() - def _wrap_callback(self, old_due_promise, keep_schedule, skip_next): - """Wraps the callback function into two other functions and returns the result""" - original_callback = self.callback - - def rescheduled(bot, job): - """The callback function for the rescheduled run""" - # Run the original callback function - original_callback(bot, job) - - # If job is non-repeating, restore the callback, ignore everything else and bail - if not self.repeat: - self.callback = original_callback - return - - # Adjust the interval so that the next run is scheduled like it was before - if keep_schedule: - original_interval = self.interval - - try: - old_due = old_due_promise.result(timeout=1.0) - - except TimeoutError: - # Possible race condition when the JobQueue wants to run this job before it - # could run the Promise in the main thread. The JobQueue thread waits for - # the Promise to resolve, but the main thread waits for the JobQueue to release - # the __queue_lock so it can reschedule this job. - # In case that happens, simply wrap the callback function again and pretend - # like nothing happened yet. - self.callback = original_callback - self.callback = self._wrap_callback(old_due_promise, keep_schedule, skip_next) - return - - self.interval = old_due - time.time() - - def next_callback(bot, job): - """The callback function for the run that was originally next""" - - # Restore callback and interval - self.callback = original_callback - - if keep_schedule: - self.interval = original_interval - - if not skip_next: - original_callback(bot, job) - - self._immediate_run_in_progress.clear() - - self.callback = next_callback - - return rescheduled - - def run_immediately(self, keep_schedule=True, skip_next=True): - """ - Puts this job to the front of the job queue, scheduled to run immediately. - - Args: - keep_schedule (Optional[bool]): If set to ``True``, which is the default, the job will - be re-scheduled so that the original schedule (as defined by the starting time and - interval) is not changed. If set to ``False``, the schedule will reset so that the - next scheduled time is calculated from the current time and the interval. - This parameter is ignored if the job is not repeating. - skip_next (Optional[bool]): If set to ``True``, which is the default, the next - scheduled execution will be skipped, effectively re-scheduling it to now. - If set to ``False``, this execution is an additional, completely unscheduled - execution. - This parameter is ignored if the job is not repeating. - - Raises: - NotImplementedError: If you use this method a second time before the job either ran - the originally scheduled execution once, or has skipped it. - ValueError: If the job is disabled at the moment - """ - if self._immediate_run_in_progress.is_set(): - raise NotImplementedError("You can't use 'run_immediately' again until the job has " - "returned to it's normal state. Sorry about that.") - - if not self.enabled: - raise ValueError("Job is disabled") - - self._immediate_run_in_progress.set() - # Wrap the old due time in a Promise - old_due_promise = Promise( - self.job_queue.update_job_due_time, # pylint: disable=no-member - [self, time.time()], - {}) - - # Wrap the callback in the wrapper function - self.callback = self._wrap_callback(old_due_promise, keep_schedule, skip_next) - - # Run the promise to reschedule the job - old_due_promise.run() - @property def enabled(self): return self._enabled.is_set() diff --git a/tests/test_jobqueue.py b/tests/test_jobqueue.py index 454f88a59..5ef279459 100644 --- a/tests/test_jobqueue.py +++ b/tests/test_jobqueue.py @@ -258,99 +258,6 @@ class JobQueueTest(BaseTest, unittest.TestCase): self.assertEqual(self.result, 1) self.assertAlmostEqual(self.jq.queue.get(False)[0], expected_time, delta=0.1) - def test_update_job_due_time(self): - job = self.jq.one_time_job(self.job1, datetime.datetime(2030, 1, 1)) - - sleep(0.5) - self.assertEqual(self.result, 0) - - self.jq.update_job_due_time(job, time.time() + 1) - - sleep(0.5) - self.assertEqual(self.result, 0) - - sleep(1.5) - self.assertEqual(self.result, 1) - - def test_job_run_immediately_one_time(self): - job = self.jq.one_time_job(self.job1, datetime.datetime(2030, 1, 1)) - - sleep(0.5) - self.assertEqual(self.result, 0) - - job.run_immediately() - - sleep(0.5) - self.assertEqual(self.result, 1) - - def test_job_run_immediately_skip(self): - job = self.jq.repeating_job(self.job1, 1, first=2) - - sleep(0.5) # 0.5s | no runs - self.assertEqual(self.result, 0) - - job.run_immediately(keep_schedule=False, skip_next=True) - - sleep(0.5) # 1s | first run at 0.5s, rescheduled with interval=1 but skipping the next run - self.assertEqual(self.result, 1) - - sleep(1) # 2s | run at 1.5s was skipped - self.assertEqual(self.result, 1) - - sleep(1) # 3s | run at 2.5s was back to normal - self.assertEqual(self.result, 2) - - sleep(1) # 4s | just to confirm - self.assertEqual(self.result, 3) - - def test_job_run_immediately_keep(self): - job = self.jq.repeating_job(self.job1, 1) - - sleep(0.5) # 0.5s | no runs - self.assertEqual(self.result, 0) - - job.run_immediately(keep_schedule=True, skip_next=False) - - # 0.75s | first run at 0.5s, rescheduled with interval=0.5 to keep up with the schedule - sleep(0.25) - self.assertEqual(self.result, 1) - - sleep(0.5) # 1.25s | run at 1s, rescheduled with interval=1 - self.assertEqual(self.result, 2) - - sleep(0.5) # 1.75s | last run still at 1s - self.assertEqual(self.result, 2) - - sleep(1) # 2.25s | run at 2s was back to normal - self.assertEqual(self.result, 3) - - sleep(1) # 3.25s | just to confirm - self.assertEqual(self.result, 4) - - def test_job_run_immediately_keep_skip(self): - job = self.jq.repeating_job(self.job1, 1) - - sleep(0.5) # 0.5s | no runs - self.assertEqual(self.result, 0) - - job.run_immediately(keep_schedule=True, skip_next=True) - - # 0.75s | first run at 0.5s, rescheduled with interval=0.5 to keep up with the schedule... - sleep(0.25) - self.assertEqual(self.result, 1) - - sleep(0.5) # 1.25s | ...but run at 1s was skipped, rescheduled with interval=1 - self.assertEqual(self.result, 1) - - sleep(0.5) # 1.75s | last run still at 1s - self.assertEqual(self.result, 1) - - sleep(1) # 2.25s | the run at 2s was back to normal - self.assertEqual(self.result, 2) - - sleep(1) # 3.25s | just to confirm - self.assertEqual(self.result, 3) - if __name__ == '__main__': unittest.main()