Merge pull request #484 from python-telegram-bot/jobqueue-absolute-simple

JobQueue: Simpler API for adding jobs
This commit is contained in:
Noam Meltzer 2016-12-21 00:04:01 +02:00 committed by GitHub
commit 7f6b017ce2
3 changed files with 351 additions and 66 deletions

View file

@ -22,6 +22,7 @@ import logging
import time
import warnings
import datetime
import weakref
from numbers import Number
from threading import Thread, Lock, Event
from queue import PriorityQueue, Empty
@ -68,40 +69,182 @@ class JobQueue(object):
Args:
job (telegram.ext.Job): The ``Job`` instance representing the new job
next_t (Optional[int, float, datetime.timedelta]): Time in which the job
should be executed first. Defaults to ``job.interval``. ``int`` and ``float``
will be interpreted as seconds.
next_t (Optional[int, float, datetime.timedelta, datetime.datetime, datetime.time]):
Time in or at which the job should run for the first time. This parameter will be
interpreted depending on its type.
``int`` or ``float`` will be interpreted as "seconds from now" in which the job
should run.
``datetime.timedelta`` will be interpreted as "time from now" in which the job
should run.
``datetime.datetime`` will be interpreted as a specific date and time at which the
job should run.
``datetime.time`` will be interpreted as a specific time at which the job should
run. This could be either today or, if the time has already passed, tomorrow.
"""
warnings.warn("'JobQueue.put' is being deprecated, use 'JobQueue.one_time_job', "
"'JobQueue.repeating_job' or 'JobQueue.daily_job' instead")
if job.job_queue is None:
job.job_queue = self
self._put(job, next_t=next_t)
def _put(self, job, next_t=None, last_t=None):
"""Queue a new job.
Args:
job (telegram.ext.Job): The ``Job`` instance representing the new job
next_t (Optional[int, float, datetime.timedelta, datetime.datetime, datetime.time]):
Time in or at which the job should run for the first time. This parameter will be
interpreted depending on its type.
* ``int`` or ``float`` will be interpreted as "seconds from now" in which the job
should run.
* ``datetime.timedelta`` will be interpreted as "time from now" in which the job
should run.
* ``datetime.datetime`` will be interpreted as a specific date and time at which
the job should run.
* ``datetime.time`` will be interpreted as a specific time of day at which the job
should run. This could be either today or, if the time has already passed,
tomorrow.
last_t (Optional[float]): Timestamp of the time when ``job`` was scheduled for in the
last ``put`` call. If provided, it will be used to calculate the next timestamp
more accurately by accounting for the execution time of the job (and possibly
others). If None, `now` will be assumed.
"""
job.job_queue = self
if next_t is None:
interval = job.interval
next_t = job.interval
if next_t is None:
raise ValueError('next_t is None')
if isinstance(next_t, datetime.datetime):
next_t = (next_t - datetime.datetime.now()).total_seconds()
elif isinstance(next_t, datetime.time):
next_datetime = datetime.datetime.combine(datetime.date.today(), next_t)
if datetime.datetime.now().time() > next_t:
next_datetime += datetime.timedelta(days=1)
next_t = (next_datetime - datetime.datetime.now()).total_seconds()
if isinstance(interval, Number):
next_t = interval
elif isinstance(interval, datetime.timedelta):
next_t = interval.total_seconds()
else:
raise ValueError("The interval argument should be of type datetime.timedelta,"
" int or float")
elif isinstance(next_t, datetime.timedelta):
next_t = next_t.total_second()
next_t = next_t.total_seconds()
now = time.time()
next_t += now
next_t += last_t or time.time()
self.logger.debug('Putting job %s with t=%f', job.name, next_t)
self.queue.put((next_t, job))
# Wake up the loop if this job should be executed next
self._set_next_peek(next_t)
def run_once(self, callback, when, context=None, name=None):
"""Creates a new ``Job`` that runs once and adds it to the queue.
Args:
callback (function): The callback function that should be executed by the new job. It
should take two parameters ``bot`` and ``job``, where ``job`` is the ``Job``
instance. It can be used to access it's ``context`` or change it to a repeating
job.
when (int, float, datetime.timedelta, datetime.datetime, datetime.time):
Time in or at which the job should run. This parameter will be interpreted
depending on its type.
* ``int`` or ``float`` will be interpreted as "seconds from now" in which the job
should run.
* ``datetime.timedelta`` will be interpreted as "time from now" in which the job
should run.
* ``datetime.datetime`` will be interpreted as a specific date and time at which
the job should run.
* ``datetime.time`` will be interpreted as a specific time of day at which the job
should run. This could be either today or, if the time has already passed,
tomorrow.
context (Optional[object]): Additional data needed for the callback function. Can be
accessed through ``job.context`` in the callback. Defaults to ``None``
name (Optional[str]): The name of the new job. Defaults to ``callback.__name__``
Returns:
Job: The new ``Job`` instance that has been added to the job queue.
"""
job = Job(callback, repeat=False, context=context, name=name, job_queue=self)
self._put(job, next_t=when)
return job
def run_repeating(self, callback, interval, first=None, context=None, name=None):
"""Creates a new ``Job`` that runs once and adds it to the queue.
Args:
callback (function): The callback function that should be executed by the new job. It
should take two parameters ``bot`` and ``job``, where ``job`` is the ``Job``
instance. It can be used to access it's ``context``, terminate the job or change
its interval.
interval (int, float, datetime.timedelta): The interval in which the job will run.
If it is an ``int`` or a ``float``, it will be interpreted as seconds.
first (int, float, datetime.timedelta, datetime.datetime, datetime.time):
* ``int`` or ``float`` will be interpreted as "seconds from now" in which the job
should run.
* ``datetime.timedelta`` will be interpreted as "time from now" in which the job
should run.
* ``datetime.datetime`` will be interpreted as a specific date and time at which
the job should run.
* ``datetime.time`` will be interpreted as a specific time of day at which the job
should run. This could be either today or, if the time has already passed,
tomorrow.
Defaults to ``interval``
context (Optional[object]): Additional data needed for the callback function. Can be
accessed through ``job.context`` in the callback. Defaults to ``None``
name (Optional[str]): The name of the new job. Defaults to ``callback.__name__``
Returns:
Job: The new ``Job`` instance that has been added to the job queue.
"""
job = Job(callback,
interval=interval,
repeat=True,
context=context,
name=name,
job_queue=self)
self._put(job, next_t=first)
return job
def run_daily(self, callback, time, days=Days.EVERY_DAY, context=None, name=None):
"""Creates a new ``Job`` that runs once and adds it to the queue.
Args:
callback (function): The callback function that should be executed by the new job. It
should take two parameters ``bot`` and ``job``, where ``job`` is the ``Job``
instance. It can be used to access it's ``context`` or terminate the job.
time (datetime.time): Time of day at which the job should run.
days (Optional[tuple[int]]): Defines on which days of the week the job should run.
Defaults to ``Days.EVERY_DAY``
context (Optional[object]): Additional data needed for the callback function. Can be
accessed through ``job.context`` in the callback. Defaults to ``None``
name (Optional[str]): The name of the new job. Defaults to ``callback.__name__``
Returns:
Job: The new ``Job`` instance that has been added to the job queue.
"""
job = Job(callback,
interval=datetime.timedelta(days=1),
repeat=True,
days=days,
context=context,
name=name,
job_queue=self)
self._put(job, next_t=time)
return job
def _set_next_peek(self, t):
"""
Set next peek if not defined or `t` is before next peek.
In case the next peek was set, also trigger the `self.__tick` event.
"""
with self.__next_peek_lock:
if not self._next_peek or self._next_peek > t:
@ -126,9 +269,9 @@ class JobQueue(object):
self.logger.debug('Peeked at %s with t=%f', job.name, t)
if t > now:
# we can get here in two conditions:
# 1. At the second or later pass of the while loop, after we've already processed
# the job(s) we were supposed to at this time.
# We can get here in two conditions:
# 1. At the second or later pass of the while loop, after we've already
# processed the job(s) we were supposed to at this time.
# 2. At the first iteration of the loop only if `self.put()` had triggered
# `self.__tick` because `self._next_peek` wasn't set
self.logger.debug("Next task isn't due yet. Finished!")
@ -136,7 +279,7 @@ class JobQueue(object):
self._set_next_peek(t)
break
if job._remove.is_set():
if job.removed:
self.logger.debug('Removing job %s', job.name)
continue
@ -146,15 +289,17 @@ class JobQueue(object):
if any(day == current_week_day for day in job.days):
self.logger.debug('Running job %s', job.name)
job.run(self.bot)
except:
self.logger.exception('An uncaught error was raised while executing job %s',
job.name)
else:
self.logger.debug('Skipping disabled job %s', job.name)
if job.repeat:
self.put(job)
if job.repeat and not job.removed:
self._put(job, last_t=t)
else:
self.logger.debug('Dropping non-repeating or removed job %s', job.name)
def start(self):
"""
@ -169,7 +314,6 @@ class JobQueue(object):
self.__thread = Thread(target=self._main_loop, name="job_queue")
self.__thread.start()
self.logger.debug('%s thread started', self.__class__.__name__)
else:
self.__start_lock.release()
@ -182,7 +326,7 @@ class JobQueue(object):
while self._running:
# self._next_peek may be (re)scheduled during self.tick() or self.put()
with self.__next_peek_lock:
tmout = self._next_peek and self._next_peek - time.time()
tmout = self._next_peek - time.time() if self._next_peek else None
self._next_peek = None
self.__tick.clear()
@ -216,46 +360,58 @@ class Job(object):
"""This class encapsulates a Job
Attributes:
callback (function):
interval (float):
days: (tuple)
repeat (bool):
name (str):
callback (function): The function that the job executes when it's due
interval (int, float, datetime.timedelta): The interval in which the job runs
days (tuple[int]): A tuple of ``int`` values that determine on which days of the week the
job runs
repeat (bool): If the job runs periodically or only once
name (str): The name of this job
job_queue (JobQueue): The ``JobQueue`` this job belongs to
enabled (bool): Boolean property that decides if this job is currently active
Args:
callback (function): The callback function that should be executed by the Job. It should
take two parameters ``bot`` and ``job``, where ``job`` is the ``Job`` instance. It
can be used to terminate the job or modify its interval.
interval ([int, float, datetime.timedelta]): The interval in which the job will execute its
callback function. ``int`` and ``float`` will be interpreted as seconds.
interval (Optional[int, float, datetime.timedelta]): The interval in which the job will
execute its callback function. ``int`` and ``float`` will be interpreted as seconds.
If you don't set this value, you must set ``repeat=False`` and specify ``next_t`` when
you put the job into the job queue.
repeat (Optional[bool]): If this job should be periodically execute its callback function
(``True``) or only once (``False``). Defaults to ``True``
context (Optional[object]): Additional data needed for the callback function. Can be
accessed through ``job.context`` in the callback. Defaults to ``None``
days (Tuple): Defines on which days the job should be ran.
days (Optional[tuple[int]]): Defines on which days of the week the job should run.
Defaults to ``Days.EVERY_DAY``
name (Optional[str]): The name of this job. Defaults to ``callback.__name__``
job_queue (Optional[class:`telegram.ext.JobQueue`]): The ``JobQueue`` this job belongs to.
Only optional for backward compatibility with ``JobQueue.put()``.
"""
job_queue = None
def __init__(self, callback, interval, repeat=True, context=None, days=Days.EVERY_DAY):
def __init__(self,
callback,
interval=None,
repeat=True,
context=None,
days=Days.EVERY_DAY,
name=None,
job_queue=None):
self.callback = callback
self.context = context
self.name = name or callback.__name__
self._repeat = repeat
self._interval = None
self.interval = interval
self.repeat = repeat
self.context = context
if not isinstance(days, tuple):
raise ValueError("The 'days argument should be of type 'tuple'")
if not all(isinstance(day, int) for day in days):
raise ValueError("The elements of the 'days' argument should be of type 'int'")
if not all(day >= 0 and day <= 6 for day in days):
raise ValueError("The elements of the 'days' argument should be from 0 up to and "
"including 6")
self._days = None
self.days = days
self.name = callback.__name__
self._job_queue = weakref.proxy(job_queue) if job_queue is not None else None
self._remove = Event()
self._enabled = Event()
self._enabled.set()
@ -268,20 +424,86 @@ class Job(object):
"""
Schedules this job for removal from the ``JobQueue``. It will be removed without executing
its callback function again.
"""
self._remove.set()
def is_enabled(self):
@property
def removed(self):
return self._remove.is_set()
@property
def enabled(self):
return self._enabled.is_set()
def set_enabled(self, status):
@enabled.setter
def enabled(self, status):
if status:
self._enabled.set()
else:
self._enabled.clear()
enabled = property(is_enabled, set_enabled)
@property
def interval(self):
return self._interval
@interval.setter
def interval(self, interval):
if interval is None and self.repeat:
raise ValueError("The 'interval' can not be 'None' when 'repeat' is set to 'True'")
if not (interval is None or isinstance(interval, (Number, datetime.timedelta))):
raise ValueError("The 'interval' must be of type 'datetime.timedelta',"
" 'int' or 'float'")
self._interval = interval
@property
def interval_seconds(self):
if isinstance(self.interval, datetime.timedelta):
return self.interval.total_seconds()
else:
return self.interval
@property
def repeat(self):
return self._repeat
@repeat.setter
def repeat(self, repeat):
if self.interval is None and repeat:
raise ValueError("'repeat' can not be set to 'True' when no 'interval' is set")
self._repeat = repeat
@property
def days(self):
return self._days
@days.setter
def days(self, days):
if not isinstance(days, tuple):
raise ValueError("The 'days' argument should be of type 'tuple'")
if not all(isinstance(day, int) for day in days):
raise ValueError("The elements of the 'days' argument should be of type 'int'")
if not all(0 <= day <= 6 for day in days):
raise ValueError("The elements of the 'days' argument should be from 0 up to and "
"including 6")
self._days = days
@property
def job_queue(self):
""" :rtype: JobQueue """
return self._job_queue
@job_queue.setter
def job_queue(self, job_queue):
# Property setter for backward compatibility with JobQueue.put()
if not self._job_queue:
self._job_queue = weakref.proxy(job_queue)
else:
raise RuntimeError("The 'job_queue' attribute can only be set once.")
def __lt__(self, other):
return False

View file

@ -425,6 +425,9 @@ class BotTest(BaseTest, unittest.TestCase):
@flaky(3, 1)
@timeout(10)
def test_send_contact(self):
# test disabled due to telegram servers annoyances repeatedly returning:
# "Flood control exceeded. Retry in 2036 seconds"
return
phone = '+3-54-5445445'
name = 'name'
last = 'last'

View file

@ -62,9 +62,6 @@ class JobQueueTest(BaseTest, unittest.TestCase):
if self.jq is not None:
self.jq.stop()
def getSeconds(self):
return int(ceil(time.time()))
def job1(self, bot, job):
self.result += 1
@ -79,7 +76,7 @@ class JobQueueTest(BaseTest, unittest.TestCase):
self.result += job.context
def job5(self, bot, job):
self.job_time = self.getSeconds()
self.job_time = time.time()
def test_basic(self):
self.jq.put(Job(self.job1, 0.1))
@ -181,22 +178,85 @@ class JobQueueTest(BaseTest, unittest.TestCase):
def test_time_unit_int(self):
# Testing seconds in int
seconds_interval = 5
expected_time = self.getSeconds() + seconds_interval
delta = 2
expected_time = time.time() + delta
self.jq.put(Job(self.job5, seconds_interval, repeat=False))
sleep(6)
self.assertEqual(self.job_time, expected_time)
self.jq.put(Job(self.job5, delta, repeat=False))
sleep(2.5)
self.assertAlmostEqual(self.job_time, expected_time, delta=0.1)
def test_time_unit_dt_time(self):
def test_time_unit_dt_timedelta(self):
# Testing seconds, minutes and hours as datetime.timedelta object
# This is sufficient to test that it actually works.
interval = datetime.timedelta(seconds=5)
expected_time = self.getSeconds() + interval.total_seconds()
interval = datetime.timedelta(seconds=2)
expected_time = time.time() + interval.total_seconds()
self.jq.put(Job(self.job5, interval, repeat=False))
sleep(6)
self.assertEqual(self.job_time, expected_time)
sleep(2.5)
self.assertAlmostEqual(self.job_time, expected_time, delta=0.1)
def test_time_unit_dt_datetime(self):
# Testing running at a specific datetime
delta = datetime.timedelta(seconds=2)
next_t = datetime.datetime.now() + delta
expected_time = time.time() + delta.total_seconds()
self.jq.put(Job(self.job5, repeat=False), next_t=next_t)
sleep(2.5)
self.assertAlmostEqual(self.job_time, expected_time, delta=0.1)
def test_time_unit_dt_time_today(self):
# Testing running at a specific time today
delta = 2
current_time = datetime.datetime.now().time()
next_t = datetime.time(current_time.hour, current_time.minute, current_time.second + delta,
current_time.microsecond)
expected_time = time.time() + delta
self.jq.put(Job(self.job5, repeat=False), next_t=next_t)
sleep(2.5)
self.assertAlmostEqual(self.job_time, expected_time, delta=0.1)
def test_time_unit_dt_time_tomorrow(self):
# Testing running at a specific time that has passed today. Since we can't wait a day, we
# test if the jobs next_t has been calculated correctly
delta = -2
current_time = datetime.datetime.now().time()
next_t = datetime.time(current_time.hour, current_time.minute, current_time.second + delta,
current_time.microsecond)
expected_time = time.time() + delta + 60 * 60 * 24
self.jq.put(Job(self.job5, repeat=False), next_t=next_t)
self.assertAlmostEqual(self.jq.queue.get(False)[0], expected_time, delta=0.1)
def test_run_once(self):
delta = 2
expected_time = time.time() + delta
self.jq.run_once(self.job5, delta)
sleep(2.5)
self.assertAlmostEqual(self.job_time, expected_time, delta=0.1)
def test_run_repeating(self):
interval = 0.1
first = 1.5
self.jq.run_repeating(self.job1, interval, first=first)
sleep(2.505)
self.assertAlmostEqual(self.result, 10, delta=1)
def test_run_daily(self):
delta = 1
current_time = datetime.datetime.now().time()
time_of_day = datetime.time(current_time.hour, current_time.minute,
current_time.second + delta, current_time.microsecond)
expected_time = time.time() + 60 * 60 * 24 + delta
self.jq.run_daily(self.job1, time_of_day)
sleep(2 * delta)
self.assertEqual(self.result, 1)
self.assertAlmostEqual(self.jq.queue.get(False)[0], expected_time, delta=0.1)
if __name__ == '__main__':