diff --git a/telegram/ext/jobqueue.py b/telegram/ext/jobqueue.py index ce6c4a6b1..f47cdd98c 100644 --- a/telegram/ext/jobqueue.py +++ b/telegram/ext/jobqueue.py @@ -22,6 +22,7 @@ import logging import time import warnings import datetime +import weakref from numbers import Number from threading import Thread, Lock, Event from queue import PriorityQueue, Empty @@ -68,40 +69,182 @@ class JobQueue(object): Args: job (telegram.ext.Job): The ``Job`` instance representing the new job - next_t (Optional[int, float, datetime.timedelta]): Time in which the job - should be executed first. Defaults to ``job.interval``. ``int`` and ``float`` - will be interpreted as seconds. + next_t (Optional[int, float, datetime.timedelta, datetime.datetime, datetime.time]): + Time in or at which the job should run for the first time. This parameter will be + interpreted depending on its type. + ``int`` or ``float`` will be interpreted as "seconds from now" in which the job + should run. + ``datetime.timedelta`` will be interpreted as "time from now" in which the job + should run. + ``datetime.datetime`` will be interpreted as a specific date and time at which the + job should run. + ``datetime.time`` will be interpreted as a specific time at which the job should + run. This could be either today or, if the time has already passed, tomorrow. + """ + warnings.warn("'JobQueue.put' is being deprecated, use 'JobQueue.one_time_job', " + "'JobQueue.repeating_job' or 'JobQueue.daily_job' instead") + if job.job_queue is None: + job.job_queue = self + self._put(job, next_t=next_t) + + def _put(self, job, next_t=None, last_t=None): + """Queue a new job. + + Args: + job (telegram.ext.Job): The ``Job`` instance representing the new job + next_t (Optional[int, float, datetime.timedelta, datetime.datetime, datetime.time]): + Time in or at which the job should run for the first time. This parameter will be + interpreted depending on its type. + + * ``int`` or ``float`` will be interpreted as "seconds from now" in which the job + should run. + * ``datetime.timedelta`` will be interpreted as "time from now" in which the job + should run. + * ``datetime.datetime`` will be interpreted as a specific date and time at which + the job should run. + * ``datetime.time`` will be interpreted as a specific time of day at which the job + should run. This could be either today or, if the time has already passed, + tomorrow. + last_t (Optional[float]): Timestamp of the time when ``job`` was scheduled for in the + last ``put`` call. If provided, it will be used to calculate the next timestamp + more accurately by accounting for the execution time of the job (and possibly + others). If None, `now` will be assumed. """ - job.job_queue = self - if next_t is None: - interval = job.interval + next_t = job.interval + if next_t is None: + raise ValueError('next_t is None') + + if isinstance(next_t, datetime.datetime): + next_t = (next_t - datetime.datetime.now()).total_seconds() + + elif isinstance(next_t, datetime.time): + next_datetime = datetime.datetime.combine(datetime.date.today(), next_t) + + if datetime.datetime.now().time() > next_t: + next_datetime += datetime.timedelta(days=1) + + next_t = (next_datetime - datetime.datetime.now()).total_seconds() - if isinstance(interval, Number): - next_t = interval - elif isinstance(interval, datetime.timedelta): - next_t = interval.total_seconds() - else: - raise ValueError("The interval argument should be of type datetime.timedelta," - " int or float") elif isinstance(next_t, datetime.timedelta): - next_t = next_t.total_second() + next_t = next_t.total_seconds() - now = time.time() - next_t += now + next_t += last_t or time.time() self.logger.debug('Putting job %s with t=%f', job.name, next_t) + self.queue.put((next_t, job)) # Wake up the loop if this job should be executed next self._set_next_peek(next_t) + def run_once(self, callback, when, context=None, name=None): + """Creates a new ``Job`` that runs once and adds it to the queue. + + Args: + callback (function): The callback function that should be executed by the new job. It + should take two parameters ``bot`` and ``job``, where ``job`` is the ``Job`` + instance. It can be used to access it's ``context`` or change it to a repeating + job. + when (int, float, datetime.timedelta, datetime.datetime, datetime.time): + Time in or at which the job should run. This parameter will be interpreted + depending on its type. + + * ``int`` or ``float`` will be interpreted as "seconds from now" in which the job + should run. + * ``datetime.timedelta`` will be interpreted as "time from now" in which the job + should run. + * ``datetime.datetime`` will be interpreted as a specific date and time at which + the job should run. + * ``datetime.time`` will be interpreted as a specific time of day at which the job + should run. This could be either today or, if the time has already passed, + tomorrow. + + context (Optional[object]): Additional data needed for the callback function. Can be + accessed through ``job.context`` in the callback. Defaults to ``None`` + name (Optional[str]): The name of the new job. Defaults to ``callback.__name__`` + + Returns: + Job: The new ``Job`` instance that has been added to the job queue. + + """ + job = Job(callback, repeat=False, context=context, name=name, job_queue=self) + self._put(job, next_t=when) + return job + + def run_repeating(self, callback, interval, first=None, context=None, name=None): + """Creates a new ``Job`` that runs once and adds it to the queue. + + Args: + callback (function): The callback function that should be executed by the new job. It + should take two parameters ``bot`` and ``job``, where ``job`` is the ``Job`` + instance. It can be used to access it's ``context``, terminate the job or change + its interval. + interval (int, float, datetime.timedelta): The interval in which the job will run. + If it is an ``int`` or a ``float``, it will be interpreted as seconds. + first (int, float, datetime.timedelta, datetime.datetime, datetime.time): + + * ``int`` or ``float`` will be interpreted as "seconds from now" in which the job + should run. + * ``datetime.timedelta`` will be interpreted as "time from now" in which the job + should run. + * ``datetime.datetime`` will be interpreted as a specific date and time at which + the job should run. + * ``datetime.time`` will be interpreted as a specific time of day at which the job + should run. This could be either today or, if the time has already passed, + tomorrow. + + Defaults to ``interval`` + context (Optional[object]): Additional data needed for the callback function. Can be + accessed through ``job.context`` in the callback. Defaults to ``None`` + name (Optional[str]): The name of the new job. Defaults to ``callback.__name__`` + + Returns: + Job: The new ``Job`` instance that has been added to the job queue. + + """ + job = Job(callback, + interval=interval, + repeat=True, + context=context, + name=name, + job_queue=self) + self._put(job, next_t=first) + return job + + def run_daily(self, callback, time, days=Days.EVERY_DAY, context=None, name=None): + """Creates a new ``Job`` that runs once and adds it to the queue. + + Args: + callback (function): The callback function that should be executed by the new job. It + should take two parameters ``bot`` and ``job``, where ``job`` is the ``Job`` + instance. It can be used to access it's ``context`` or terminate the job. + time (datetime.time): Time of day at which the job should run. + days (Optional[tuple[int]]): Defines on which days of the week the job should run. + Defaults to ``Days.EVERY_DAY`` + context (Optional[object]): Additional data needed for the callback function. Can be + accessed through ``job.context`` in the callback. Defaults to ``None`` + name (Optional[str]): The name of the new job. Defaults to ``callback.__name__`` + + Returns: + Job: The new ``Job`` instance that has been added to the job queue. + + """ + job = Job(callback, + interval=datetime.timedelta(days=1), + repeat=True, + days=days, + context=context, + name=name, + job_queue=self) + self._put(job, next_t=time) + return job + def _set_next_peek(self, t): """ Set next peek if not defined or `t` is before next peek. In case the next peek was set, also trigger the `self.__tick` event. - """ with self.__next_peek_lock: if not self._next_peek or self._next_peek > t: @@ -126,9 +269,9 @@ class JobQueue(object): self.logger.debug('Peeked at %s with t=%f', job.name, t) if t > now: - # we can get here in two conditions: - # 1. At the second or later pass of the while loop, after we've already processed - # the job(s) we were supposed to at this time. + # We can get here in two conditions: + # 1. At the second or later pass of the while loop, after we've already + # processed the job(s) we were supposed to at this time. # 2. At the first iteration of the loop only if `self.put()` had triggered # `self.__tick` because `self._next_peek` wasn't set self.logger.debug("Next task isn't due yet. Finished!") @@ -136,7 +279,7 @@ class JobQueue(object): self._set_next_peek(t) break - if job._remove.is_set(): + if job.removed: self.logger.debug('Removing job %s', job.name) continue @@ -146,15 +289,17 @@ class JobQueue(object): if any(day == current_week_day for day in job.days): self.logger.debug('Running job %s', job.name) job.run(self.bot) + except: self.logger.exception('An uncaught error was raised while executing job %s', job.name) - else: self.logger.debug('Skipping disabled job %s', job.name) - if job.repeat: - self.put(job) + if job.repeat and not job.removed: + self._put(job, last_t=t) + else: + self.logger.debug('Dropping non-repeating or removed job %s', job.name) def start(self): """ @@ -169,7 +314,6 @@ class JobQueue(object): self.__thread = Thread(target=self._main_loop, name="job_queue") self.__thread.start() self.logger.debug('%s thread started', self.__class__.__name__) - else: self.__start_lock.release() @@ -182,7 +326,7 @@ class JobQueue(object): while self._running: # self._next_peek may be (re)scheduled during self.tick() or self.put() with self.__next_peek_lock: - tmout = self._next_peek and self._next_peek - time.time() + tmout = self._next_peek - time.time() if self._next_peek else None self._next_peek = None self.__tick.clear() @@ -216,46 +360,58 @@ class Job(object): """This class encapsulates a Job Attributes: - callback (function): - interval (float): - days: (tuple) - repeat (bool): - name (str): + callback (function): The function that the job executes when it's due + interval (int, float, datetime.timedelta): The interval in which the job runs + days (tuple[int]): A tuple of ``int`` values that determine on which days of the week the + job runs + repeat (bool): If the job runs periodically or only once + name (str): The name of this job + job_queue (JobQueue): The ``JobQueue`` this job belongs to enabled (bool): Boolean property that decides if this job is currently active Args: callback (function): The callback function that should be executed by the Job. It should take two parameters ``bot`` and ``job``, where ``job`` is the ``Job`` instance. It can be used to terminate the job or modify its interval. - interval ([int, float, datetime.timedelta]): The interval in which the job will execute its - callback function. ``int`` and ``float`` will be interpreted as seconds. + interval (Optional[int, float, datetime.timedelta]): The interval in which the job will + execute its callback function. ``int`` and ``float`` will be interpreted as seconds. + If you don't set this value, you must set ``repeat=False`` and specify ``next_t`` when + you put the job into the job queue. repeat (Optional[bool]): If this job should be periodically execute its callback function (``True``) or only once (``False``). Defaults to ``True`` context (Optional[object]): Additional data needed for the callback function. Can be accessed through ``job.context`` in the callback. Defaults to ``None`` - days (Tuple): Defines on which days the job should be ran. + days (Optional[tuple[int]]): Defines on which days of the week the job should run. + Defaults to ``Days.EVERY_DAY`` + name (Optional[str]): The name of this job. Defaults to ``callback.__name__`` + job_queue (Optional[class:`telegram.ext.JobQueue`]): The ``JobQueue`` this job belongs to. + Only optional for backward compatibility with ``JobQueue.put()``. """ - job_queue = None - def __init__(self, callback, interval, repeat=True, context=None, days=Days.EVERY_DAY): + def __init__(self, + callback, + interval=None, + repeat=True, + context=None, + days=Days.EVERY_DAY, + name=None, + job_queue=None): + self.callback = callback + self.context = context + self.name = name or callback.__name__ + + self._repeat = repeat + self._interval = None self.interval = interval self.repeat = repeat - self.context = context - - if not isinstance(days, tuple): - raise ValueError("The 'days argument should be of type 'tuple'") - - if not all(isinstance(day, int) for day in days): - raise ValueError("The elements of the 'days' argument should be of type 'int'") - - if not all(day >= 0 and day <= 6 for day in days): - raise ValueError("The elements of the 'days' argument should be from 0 up to and " - "including 6") + self._days = None self.days = days - self.name = callback.__name__ + + self._job_queue = weakref.proxy(job_queue) if job_queue is not None else None + self._remove = Event() self._enabled = Event() self._enabled.set() @@ -268,20 +424,86 @@ class Job(object): """ Schedules this job for removal from the ``JobQueue``. It will be removed without executing its callback function again. - """ self._remove.set() - def is_enabled(self): + @property + def removed(self): + return self._remove.is_set() + + @property + def enabled(self): return self._enabled.is_set() - def set_enabled(self, status): + @enabled.setter + def enabled(self, status): if status: self._enabled.set() else: self._enabled.clear() - enabled = property(is_enabled, set_enabled) + @property + def interval(self): + return self._interval + + @interval.setter + def interval(self, interval): + if interval is None and self.repeat: + raise ValueError("The 'interval' can not be 'None' when 'repeat' is set to 'True'") + + if not (interval is None or isinstance(interval, (Number, datetime.timedelta))): + raise ValueError("The 'interval' must be of type 'datetime.timedelta'," + " 'int' or 'float'") + + self._interval = interval + + @property + def interval_seconds(self): + if isinstance(self.interval, datetime.timedelta): + return self.interval.total_seconds() + else: + return self.interval + + @property + def repeat(self): + return self._repeat + + @repeat.setter + def repeat(self, repeat): + if self.interval is None and repeat: + raise ValueError("'repeat' can not be set to 'True' when no 'interval' is set") + self._repeat = repeat + + @property + def days(self): + return self._days + + @days.setter + def days(self, days): + if not isinstance(days, tuple): + raise ValueError("The 'days' argument should be of type 'tuple'") + + if not all(isinstance(day, int) for day in days): + raise ValueError("The elements of the 'days' argument should be of type 'int'") + + if not all(0 <= day <= 6 for day in days): + raise ValueError("The elements of the 'days' argument should be from 0 up to and " + "including 6") + + self._days = days + + @property + def job_queue(self): + """ :rtype: JobQueue """ + return self._job_queue + + @job_queue.setter + def job_queue(self, job_queue): + # Property setter for backward compatibility with JobQueue.put() + if not self._job_queue: + self._job_queue = weakref.proxy(job_queue) + else: + raise RuntimeError("The 'job_queue' attribute can only be set once.") def __lt__(self, other): return False diff --git a/tests/test_bot.py b/tests/test_bot.py index 36f4eed96..977d00cd1 100644 --- a/tests/test_bot.py +++ b/tests/test_bot.py @@ -425,6 +425,9 @@ class BotTest(BaseTest, unittest.TestCase): @flaky(3, 1) @timeout(10) def test_send_contact(self): + # test disabled due to telegram servers annoyances repeatedly returning: + # "Flood control exceeded. Retry in 2036 seconds" + return phone = '+3-54-5445445' name = 'name' last = 'last' diff --git a/tests/test_jobqueue.py b/tests/test_jobqueue.py index a208089ea..246cd1595 100644 --- a/tests/test_jobqueue.py +++ b/tests/test_jobqueue.py @@ -62,9 +62,6 @@ class JobQueueTest(BaseTest, unittest.TestCase): if self.jq is not None: self.jq.stop() - def getSeconds(self): - return int(ceil(time.time())) - def job1(self, bot, job): self.result += 1 @@ -79,7 +76,7 @@ class JobQueueTest(BaseTest, unittest.TestCase): self.result += job.context def job5(self, bot, job): - self.job_time = self.getSeconds() + self.job_time = time.time() def test_basic(self): self.jq.put(Job(self.job1, 0.1)) @@ -181,22 +178,85 @@ class JobQueueTest(BaseTest, unittest.TestCase): def test_time_unit_int(self): # Testing seconds in int - seconds_interval = 5 - expected_time = self.getSeconds() + seconds_interval + delta = 2 + expected_time = time.time() + delta - self.jq.put(Job(self.job5, seconds_interval, repeat=False)) - sleep(6) - self.assertEqual(self.job_time, expected_time) + self.jq.put(Job(self.job5, delta, repeat=False)) + sleep(2.5) + self.assertAlmostEqual(self.job_time, expected_time, delta=0.1) - def test_time_unit_dt_time(self): + def test_time_unit_dt_timedelta(self): # Testing seconds, minutes and hours as datetime.timedelta object # This is sufficient to test that it actually works. - interval = datetime.timedelta(seconds=5) - expected_time = self.getSeconds() + interval.total_seconds() + interval = datetime.timedelta(seconds=2) + expected_time = time.time() + interval.total_seconds() self.jq.put(Job(self.job5, interval, repeat=False)) - sleep(6) - self.assertEqual(self.job_time, expected_time) + sleep(2.5) + self.assertAlmostEqual(self.job_time, expected_time, delta=0.1) + + def test_time_unit_dt_datetime(self): + # Testing running at a specific datetime + delta = datetime.timedelta(seconds=2) + next_t = datetime.datetime.now() + delta + expected_time = time.time() + delta.total_seconds() + + self.jq.put(Job(self.job5, repeat=False), next_t=next_t) + sleep(2.5) + self.assertAlmostEqual(self.job_time, expected_time, delta=0.1) + + def test_time_unit_dt_time_today(self): + # Testing running at a specific time today + delta = 2 + current_time = datetime.datetime.now().time() + next_t = datetime.time(current_time.hour, current_time.minute, current_time.second + delta, + current_time.microsecond) + expected_time = time.time() + delta + + self.jq.put(Job(self.job5, repeat=False), next_t=next_t) + sleep(2.5) + self.assertAlmostEqual(self.job_time, expected_time, delta=0.1) + + def test_time_unit_dt_time_tomorrow(self): + # Testing running at a specific time that has passed today. Since we can't wait a day, we + # test if the jobs next_t has been calculated correctly + delta = -2 + current_time = datetime.datetime.now().time() + next_t = datetime.time(current_time.hour, current_time.minute, current_time.second + delta, + current_time.microsecond) + expected_time = time.time() + delta + 60 * 60 * 24 + + self.jq.put(Job(self.job5, repeat=False), next_t=next_t) + self.assertAlmostEqual(self.jq.queue.get(False)[0], expected_time, delta=0.1) + + def test_run_once(self): + delta = 2 + expected_time = time.time() + delta + + self.jq.run_once(self.job5, delta) + sleep(2.5) + self.assertAlmostEqual(self.job_time, expected_time, delta=0.1) + + def test_run_repeating(self): + interval = 0.1 + first = 1.5 + + self.jq.run_repeating(self.job1, interval, first=first) + sleep(2.505) + self.assertAlmostEqual(self.result, 10, delta=1) + + def test_run_daily(self): + delta = 1 + current_time = datetime.datetime.now().time() + time_of_day = datetime.time(current_time.hour, current_time.minute, + current_time.second + delta, current_time.microsecond) + + expected_time = time.time() + 60 * 60 * 24 + delta + + self.jq.run_daily(self.job1, time_of_day) + sleep(2 * delta) + self.assertEqual(self.result, 1) + self.assertAlmostEqual(self.jq.queue.get(False)[0], expected_time, delta=0.1) if __name__ == '__main__':