diff --git a/telegram/ext/jobqueue.py b/telegram/ext/jobqueue.py index 4065e4d31..76368389a 100644 --- a/telegram/ext/jobqueue.py +++ b/telegram/ext/jobqueue.py @@ -23,9 +23,11 @@ import time import warnings import datetime from numbers import Number -from threading import Thread, Lock, Event +from threading import Thread, Lock, RLock, Event from queue import PriorityQueue, Empty +from telegram.utils.promise import Promise + class Days(object): MON, TUE, WED, THU, FRI, SAT, SUN = range(7) @@ -56,6 +58,7 @@ class JobQueue(object): self.logger = logging.getLogger(self.__class__.__name__) self.__start_lock = Lock() self.__next_peek_lock = Lock() # to protect self._next_peek & self.__tick + self.__queue_lock = RLock() # to protect self.queue self.__tick = Event() self.__thread = None """:type: Thread""" @@ -69,14 +72,44 @@ class JobQueue(object): Args: job (telegram.ext.Job): The ``Job`` instance representing the new job next_t (Optional[int, float, datetime.timedelta, datetime.datetime, datetime.time]): - Time in or at which the job should be executed first. Defaults to ``job.interval``. - If it is an ``int`` or a ``float``, it will be interpreted as seconds. If it is - a ``datetime.datetime``, it will be executed at the specified date and time. If it - is a ``datetime.time``, it will execute at the specified time today or, if the time - has already passed, tomorrow. - + Time in or at which the job should run for the first time. This parameter will be + interpreted depending on its type. + ``int`` or ``float`` will be interpreted as "seconds from now" in which the job + should run. + ``datetime.timedelta`` will be interpreted as "time from now" in which the job + should run. + ``datetime.datetime`` will be interpreted as a specific date and time at which the + job should run. + ``datetime.time`` will be interpreted as a specific time at which the job should + run. This could be either today or, if the time has already passed, tomorrow. """ - job.job_queue = self + warnings.warn("'JobQueue.put' is being deprecated, use 'JobQueue.one_time_job', " + "'JobQueue.repeating_job' or 'JobQueue.daily_job' instead") + self._put(job, next_t=next_t) + + def _put(self, job, next_t=None, last_t=None): + """Queue a new job. + + Args: + job (telegram.ext.Job): The ``Job`` instance representing the new job + next_t (Optional[int, float, datetime.timedelta, datetime.datetime, datetime.time]): + Time in or at which the job should run for the first time. This parameter will be + interpreted depending on its type. + ``int`` or ``float`` will be interpreted as "seconds from now" in which the job + should run. + ``datetime.timedelta`` will be interpreted as "time from now" in which the job + should run. + ``datetime.datetime`` will be interpreted as a specific date and time at which the + job should run. + ``datetime.time`` will be interpreted as a specific time at which the job should + run. This could be either today or, if the time has already passed, tomorrow. + last_t (Optional[float]): Timestamp of the time when ``job`` was scheduled for in the + last ``put`` call. If provided, it will used to calculate the next timestamp more + accurately by accounting for the execution time of the job (and possibly others). + By default, the current timestamp is used. + """ + if job.job_queue is not self: + job.job_queue = self if next_t is None: next_t = job.interval @@ -95,20 +128,150 @@ class JobQueue(object): if isinstance(next_t, datetime.timedelta): next_t = next_t.total_seconds() - now = time.time() - next_t += now + next_t += last_t or time.time() self.logger.debug('Putting job %s with t=%f', job.name, next_t) - self.queue.put((next_t, job)) + + with self.__queue_lock: + self.queue.put((next_t, job)) # Wake up the loop if this job should be executed next self._set_next_peek(next_t) + def one_time_job(self, callback, when, context=None, name=None): + """Creates a new ``Job`` that runs once and adds it to the queue. + + Args: + callback (function): The callback function that should be executed by the new job. It + should take two parameters ``bot`` and ``job``, where ``job`` is the ``Job`` + instance. It can be used to access it's ``context`` or change it to a repeating + job. + when (int, float, datetime.timedelta, datetime.datetime, datetime.time): + Time in or at which the job should run. This parameter will be interpreted + depending on its type. + ``int`` or ``float`` will be interpreted as "seconds from now" in which the job + should run. + ``datetime.timedelta`` will be interpreted as "time from now" in which the job + should run. + ``datetime.datetime`` will be interpreted as a specific date and time at which the + job should run. + ``datetime.time`` will be interpreted as a specific time at which the job should + run. This could be either today or, if the time has already passed, tomorrow. + context (Optional[object]): Additional data needed for the callback function. Can be + accessed through ``job.context`` in the callback. Defaults to ``None`` + name (Optional[str]): The name of the new job. Defaults to ``callback.__name__`` + + Returns: + Job: The new ``Job`` instance that has been added to the job queue. + """ + job = Job(callback, repeat=False, context=context, name=name) + self._put(job, next_t=when) + return job + + def repeating_job(self, callback, interval, first=None, context=None, name=None): + """Creates a new ``Job`` that runs once and adds it to the queue. + + Args: + callback (function): The callback function that should be executed by the new job. It + should take two parameters ``bot`` and ``job``, where ``job`` is the ``Job`` + instance. It can be used to access it's ``context``, terminate the job or change + its interval. + interval (int, float, datetime.timedelta): The interval in which the job will run. + If it is an ``int`` or a ``float``, it will be interpreted as seconds. + first (int, float, datetime.timedelta, datetime.datetime, datetime.time): + Time in or at which the job should run for the first time. This parameter will be + interpreted depending on its type. + ``int`` or ``float`` will be interpreted as "seconds from now" in which the job + should run. + ``datetime.timedelta`` will be interpreted as "time from now" in which the job + should run. + ``datetime.datetime`` will be interpreted as a specific date and time at which the + job should run. + ``datetime.time`` will be interpreted as a specific time at which the job should + run. This could be either today or, if the time has already passed, tomorrow. + Defaults to ``interval`` + context (Optional[object]): Additional data needed for the callback function. Can be + accessed through ``job.context`` in the callback. Defaults to ``None`` + name (Optional[str]): The name of the new job. Defaults to ``callback.__name__`` + + Returns: + Job: The new ``Job`` instance that has been added to the job queue. + """ + job = Job(callback, interval=interval, repeat=True, context=context, name=name) + self._put(job, next_t=first) + return job + + def daily_job(self, callback, time, days=Days.EVERY_DAY, context=None, name=None): + """Creates a new ``Job`` that runs once and adds it to the queue. + + Args: + callback (function): The callback function that should be executed by the new job. It + should take two parameters ``bot`` and ``job``, where ``job`` is the ``Job`` + instance. It can be used to access it's ``context`` or terminate the job. + time (datetime.time): Time of day at which the job should run. + days (Optional[tuple[int]]): Defines on which days of the week the job should run. + Defaults to ``Days.EVERY_DAY`` + context (Optional[object]): Additional data needed for the callback function. Can be + accessed through ``job.context`` in the callback. Defaults to ``None`` + name (Optional[str]): The name of the new job. Defaults to ``callback.__name__`` + + Returns: + Job: The new ``Job`` instance that has been added to the job queue. + """ + job = Job(callback, + interval=datetime.timedelta(days=1), + repeat=True, + days=days, + context=context, + name=name) + self._put(job, next_t=time) + return job + + def update_job_due_time(self, job, due): + """ + Changes the due time of a job. + + Args: + job (Job): The job of which the due time should be changed. Must already exist in the + queue. + due (float): The new due time as a timestamp + + Returns: + float: The old due time of the job as a timestamp + """ + + with self.__queue_lock: + cache = list() + # Pop jobs from the priority queue one by one and check if it's the one we need. + # All other jobs are stashed away in the 'cache' list. Once the right job is found, + # it's put back into the queue with the new due time, together with all cached jobs. + try: + while True: + due_test, job_test = self.queue.get(block=False) + + if job_test is job: + self.queue.put((due, job_test)) + old_due = due_test + break + + else: + cache.append((due_test, job_test)) + + except Empty: + raise ValueError("Specified 'job' doesn't exist in the job queue") + + finally: + for item in reversed(cache): + self.queue.put(item) + + # Make sure the queue wakes up if the updated due time requires it + self._set_next_peek(due) + return old_due + def _set_next_peek(self, t): """ Set next peek if not defined or `t` is before next peek. In case the next peek was set, also trigger the `self.__tick` event. - """ with self.__next_peek_lock: if not self._next_peek or self._next_peek > t: @@ -120,48 +283,56 @@ class JobQueue(object): Run all jobs that are due and re-enqueue them with their interval. """ - now = time.time() + with self.__queue_lock: + now = time.time() - self.logger.debug('Ticking jobs with t=%f', now) + self.logger.debug('Ticking jobs with t=%f', now) - while True: - try: - t, job = self.queue.get(False) - except Empty: - break - - self.logger.debug('Peeked at %s with t=%f', job.name, t) - - if t > now: - # we can get here in two conditions: - # 1. At the second or later pass of the while loop, after we've already processed - # the job(s) we were supposed to at this time. - # 2. At the first iteration of the loop only if `self.put()` had triggered - # `self.__tick` because `self._next_peek` wasn't set - self.logger.debug("Next task isn't due yet. Finished!") - self.queue.put((t, job)) - self._set_next_peek(t) - break - - if job._remove.is_set(): - self.logger.debug('Removing job %s', job.name) - continue - - if job.enabled: + while True: try: - current_week_day = datetime.datetime.now().weekday() - if any(day == current_week_day for day in job.days): - self.logger.debug('Running job %s', job.name) - job.run(self.bot) - except: - self.logger.exception('An uncaught error was raised while executing job %s', - job.name) + t, job = self.queue.get(False) - else: - self.logger.debug('Skipping disabled job %s', job.name) + except Empty: + break - if job.repeat: - self.put(job) + self.logger.debug('Peeked at %s with t=%f', job.name, t) + + if t > now: + # We can get here in two conditions: + # 1. At the second or later pass of the while loop, after we've already + # processed the job(s) we were supposed to at this time. + # 2. At the first iteration of the loop only if `self.put()` had triggered + # `self.__tick` because `self._next_peek` wasn't set + self.logger.debug("Next task isn't due yet. Finished!") + self.queue.put((t, job)) + self._set_next_peek(t) + break + + if job._remove.is_set(): + self.logger.debug('Removing job %s', job.name) + job.job_queue = None + continue + + if job.enabled: + try: + current_week_day = datetime.datetime.now().weekday() + if any(day == current_week_day for day in job.days): + self.logger.debug('Running job %s', job.name) + job.run(self.bot) + + except: + self.logger.exception( + 'An uncaught error was raised while executing job %s', job.name) + + else: + self.logger.debug('Skipping disabled job %s', job.name) + + if job.repeat: + self._put(job, last_t=t) + + else: + self.logger.debug('Dropping non-repeating job %s', job.name) + job.job_queue = None def start(self): """ @@ -216,18 +387,21 @@ class JobQueue(object): def jobs(self): """Returns a tuple of all jobs that are currently in the ``JobQueue``""" - return tuple(job[1] for job in self.queue.queue if job) + with self.__queue_lock: + return tuple(job[1] for job in self.queue.queue if job) class Job(object): """This class encapsulates a Job Attributes: - callback (function): - interval (float): - days: (tuple) - repeat (bool): - name (str): + callback (function): The function that the job executes when it's due + interval (int, float, datetime.timedelta): The interval in which the job runs + days (tuple[int]): A tuple of ``int`` values that determine on which days of the week the + job runs + repeat (bool): If the job runs periodically or only once + name (str): The name of this job + job_queue (JobQueue): The ``JobQueue`` this job belongs to enabled (bool): Boolean property that decides if this job is currently active Args: @@ -242,39 +416,37 @@ class Job(object): (``True``) or only once (``False``). Defaults to ``True`` context (Optional[object]): Additional data needed for the callback function. Can be accessed through ``job.context`` in the callback. Defaults to ``None`` - days (Tuple): Defines on which days the job should be ran. - + days (Optional[tuple[int]]): Defines on which days of the week the job should run. + Defaults to ``Days.EVERY_DAY`` + name (Optional[str]): The name of this job. Defaults to ``callback.__name__`` """ - job_queue = None - def __init__(self, callback, interval=None, repeat=True, context=None, days=Days.EVERY_DAY): + def __init__(self, + callback, + interval=None, + repeat=True, + context=None, + days=Days.EVERY_DAY, + name=None): + self.callback = callback + self.context = context + self.name = name or callback.__name__ + + self._repeat = repeat + self._interval = None self.interval = interval self.repeat = repeat - self.context = context - - if not isinstance(days, tuple): - raise ValueError("The 'days argument should be of type 'tuple'") - - if not all(isinstance(day, int) for day in days): - raise ValueError("The elements of the 'days' argument should be of type 'int'") - - if not all(0 <= day <= 6 for day in days): - raise ValueError("The elements of the 'days' argument should be from 0 up to and " - "including 6") - - if interval is None and repeat: - raise ValueError("You must either set an interval or set 'repeat' to 'False'") - - if not (isinstance(interval, Number) or isinstance(interval, datetime.timedelta)): - raise ValueError("The 'interval' argument must be of type 'datetime.timedelta'," - " 'int' or 'float'") + self._days = None self.days = days - self.name = callback.__name__ + + self._job_queue = None + self._remove = Event() self._enabled = Event() self._enabled.set() + self._immediate_run_in_progress = Event() def run(self, bot): """Executes the callback function""" @@ -284,20 +456,175 @@ class Job(object): """ Schedules this job for removal from the ``JobQueue``. It will be removed without executing its callback function again. - """ self._remove.set() - def is_enabled(self): + def _wrap_callback(self, old_due_promise, keep_schedule, skip_next): + """Wraps the callback function into two other functions and returns the result""" + original_callback = self.callback + + def rescheduled(bot, job): + """The callback function for the rescheduled run""" + # Run the original callback function + original_callback(bot, job) + + # If job is non-repeating, restore the callback, ignore everything else and bail + if not self.repeat: + self.callback = original_callback + return + + # Adjust the interval so that the next run is scheduled like it was before + if keep_schedule: + original_interval = self.interval + + try: + old_due = old_due_promise.result(timeout=1.0) + + except TimeoutError: + # Possible race condition when the JobQueue wants to run this job before it + # could run the Promise in the main thread. The JobQueue thread waits for + # the Promise to resolve, but the main thread waits for the JobQueue to release + # the __queue_lock so it can reschedule this job. + # In case that happens, simply wrap the callback function again and pretend + # like nothing happened yet. + self.callback = original_callback + self.callback = self._wrap_callback(old_due_promise, keep_schedule, skip_next) + return + + self.interval = old_due - time.time() + + def next_callback(bot, job): + """The callback function for the run that was originally next""" + + # Restore callback and interval + self.callback = original_callback + + if keep_schedule: + self.interval = original_interval + + if not skip_next: + original_callback(bot, job) + + self._immediate_run_in_progress.clear() + + self.callback = next_callback + + return rescheduled + + def run_immediately(self, keep_schedule=True, skip_next=True): + """ + Puts this job to the front of the job queue, scheduled to run immediately. + + Args: + keep_schedule (Optional[bool]): If set to ``True``, which is the default, the job will + be re-scheduled so that the original schedule (as defined by the starting time and + interval) is not changed. If set to ``False``, the schedule will reset so that the + next scheduled time is calculated from the current time and the interval. + This parameter is ignored if the job is not repeating. + skip_next (Optional[bool]): If set to ``True``, which is the default, the next + scheduled execution will be skipped, effectively re-scheduling it to now. + If set to ``False``, this execution is an additional, completely unscheduled + execution. + This parameter is ignored if the job is not repeating. + + Raises: + NotImplementedError: If you use this method a second time before the job either ran + the originally scheduled execution once, or has skipped it. + ValueError: If the job is disabled at the moment + """ + if self._immediate_run_in_progress.is_set(): + raise NotImplementedError("You can't use 'run_immediately' again until the job has " + "returned to it's normal state. Sorry about that.") + + if not self.enabled: + raise ValueError("Job is disabled") + + self._immediate_run_in_progress.set() + # Wrap the old due time in a Promise + old_due_promise = Promise( + self.job_queue.update_job_due_time, # pylint: disable=no-member + [self, time.time()], + {}) + + # Wrap the callback in the wrapper function + self.callback = self._wrap_callback(old_due_promise, keep_schedule, skip_next) + + # Run the promise to reschedule the job + old_due_promise.run() + + @property + def enabled(self): return self._enabled.is_set() - def set_enabled(self, status): + @enabled.setter + def enabled(self, status): if status: self._enabled.set() else: self._enabled.clear() - enabled = property(is_enabled, set_enabled) + @property + def interval(self): + return self._interval + + @interval.setter + def interval(self, interval): + if interval is None and self.repeat: + raise ValueError("The 'interval' can not be 'None' when 'repeat' is set to 'True'") + + if not (interval is None or isinstance(interval, Number) or + isinstance(interval, datetime.timedelta)): + raise ValueError("The 'interval' must be of type 'datetime.timedelta'," + " 'int' or 'float'") + + self._interval = interval + + @property + def interval_seconds(self): + if isinstance(self.interval, datetime.timedelta): + return self.interval.total_seconds() + else: + return self.interval + + @property + def repeat(self): + return self._repeat + + @repeat.setter + def repeat(self, repeat): + if self.interval is None and repeat: + raise ValueError("'repeat' can not be set to 'True' when no 'interval' is set") + self._repeat = repeat + + @property + def days(self): + return self._days + + @days.setter + def days(self, days): + if not isinstance(days, tuple): + raise ValueError("The 'days' argument should be of type 'tuple'") + + if not all(isinstance(day, int) for day in days): + raise ValueError("The elements of the 'days' argument should be of type 'int'") + + if not all(0 <= day <= 6 for day in days): + raise ValueError("The elements of the 'days' argument should be from 0 up to and " + "including 6") + + self._days = days + + @property + def job_queue(self): + """ :rtype: JobQueue """ + return self._job_queue + + @job_queue.setter + def job_queue(self, job_queue): + if not self._job_queue or not job_queue: + self._job_queue = job_queue + else: + raise ValueError("The 'job_queue' attribute can only be set once.") def __lt__(self, other): return False diff --git a/tests/test_jobqueue.py b/tests/test_jobqueue.py index 6bf34dd82..454f88a59 100644 --- a/tests/test_jobqueue.py +++ b/tests/test_jobqueue.py @@ -178,10 +178,10 @@ class JobQueueTest(BaseTest, unittest.TestCase): def test_time_unit_int(self): # Testing seconds in int - seconds_interval = 2 - expected_time = time.time() + seconds_interval + delta = 2 + expected_time = time.time() + delta - self.jq.put(Job(self.job5, seconds_interval, repeat=False)) + self.jq.put(Job(self.job5, delta, repeat=False)) sleep(2.5) self.assertAlmostEqual(self.job_time, expected_time, delta=0.1) @@ -229,6 +229,128 @@ class JobQueueTest(BaseTest, unittest.TestCase): self.jq.put(Job(self.job5, repeat=False), next_t=next_t) self.assertAlmostEqual(self.jq.queue.get(False)[0], expected_time, delta=0.1) + def test_one_time_job(self): + delta = 2 + expected_time = time.time() + delta + + self.jq.one_time_job(self.job5, delta) + sleep(2.5) + self.assertAlmostEqual(self.job_time, expected_time, delta=0.1) + + def test_repeating_job(self): + interval = 0.1 + first = 1.5 + + self.jq.repeating_job(self.job1, interval, first=first) + sleep(2.505) + self.assertAlmostEqual(self.result, 10, delta=1) + + def test_daily_job(self): + delta = 1 + current_time = datetime.datetime.now().time() + time_of_day = datetime.time(current_time.hour, current_time.minute, + current_time.second + delta, current_time.microsecond) + + expected_time = time.time() + 60 * 60 * 24 + delta + + self.jq.daily_job(self.job1, time_of_day) + sleep(2 * delta) + self.assertEqual(self.result, 1) + self.assertAlmostEqual(self.jq.queue.get(False)[0], expected_time, delta=0.1) + + def test_update_job_due_time(self): + job = self.jq.one_time_job(self.job1, datetime.datetime(2030, 1, 1)) + + sleep(0.5) + self.assertEqual(self.result, 0) + + self.jq.update_job_due_time(job, time.time() + 1) + + sleep(0.5) + self.assertEqual(self.result, 0) + + sleep(1.5) + self.assertEqual(self.result, 1) + + def test_job_run_immediately_one_time(self): + job = self.jq.one_time_job(self.job1, datetime.datetime(2030, 1, 1)) + + sleep(0.5) + self.assertEqual(self.result, 0) + + job.run_immediately() + + sleep(0.5) + self.assertEqual(self.result, 1) + + def test_job_run_immediately_skip(self): + job = self.jq.repeating_job(self.job1, 1, first=2) + + sleep(0.5) # 0.5s | no runs + self.assertEqual(self.result, 0) + + job.run_immediately(keep_schedule=False, skip_next=True) + + sleep(0.5) # 1s | first run at 0.5s, rescheduled with interval=1 but skipping the next run + self.assertEqual(self.result, 1) + + sleep(1) # 2s | run at 1.5s was skipped + self.assertEqual(self.result, 1) + + sleep(1) # 3s | run at 2.5s was back to normal + self.assertEqual(self.result, 2) + + sleep(1) # 4s | just to confirm + self.assertEqual(self.result, 3) + + def test_job_run_immediately_keep(self): + job = self.jq.repeating_job(self.job1, 1) + + sleep(0.5) # 0.5s | no runs + self.assertEqual(self.result, 0) + + job.run_immediately(keep_schedule=True, skip_next=False) + + # 0.75s | first run at 0.5s, rescheduled with interval=0.5 to keep up with the schedule + sleep(0.25) + self.assertEqual(self.result, 1) + + sleep(0.5) # 1.25s | run at 1s, rescheduled with interval=1 + self.assertEqual(self.result, 2) + + sleep(0.5) # 1.75s | last run still at 1s + self.assertEqual(self.result, 2) + + sleep(1) # 2.25s | run at 2s was back to normal + self.assertEqual(self.result, 3) + + sleep(1) # 3.25s | just to confirm + self.assertEqual(self.result, 4) + + def test_job_run_immediately_keep_skip(self): + job = self.jq.repeating_job(self.job1, 1) + + sleep(0.5) # 0.5s | no runs + self.assertEqual(self.result, 0) + + job.run_immediately(keep_schedule=True, skip_next=True) + + # 0.75s | first run at 0.5s, rescheduled with interval=0.5 to keep up with the schedule... + sleep(0.25) + self.assertEqual(self.result, 1) + + sleep(0.5) # 1.25s | ...but run at 1s was skipped, rescheduled with interval=1 + self.assertEqual(self.result, 1) + + sleep(0.5) # 1.75s | last run still at 1s + self.assertEqual(self.result, 1) + + sleep(1) # 2.25s | the run at 2s was back to normal + self.assertEqual(self.result, 2) + + sleep(1) # 3.25s | just to confirm + self.assertEqual(self.result, 3) + if __name__ == '__main__': unittest.main()