DAAAANGER ZOOOONE

This commit is contained in:
Jannes Höke 2016-12-14 16:27:45 +01:00
parent c7cd379016
commit 09ddc1b1a8
2 changed files with 534 additions and 85 deletions

View file

@ -23,9 +23,11 @@ import time
import warnings
import datetime
from numbers import Number
from threading import Thread, Lock, Event
from threading import Thread, Lock, RLock, Event
from queue import PriorityQueue, Empty
from telegram.utils.promise import Promise
class Days(object):
MON, TUE, WED, THU, FRI, SAT, SUN = range(7)
@ -56,6 +58,7 @@ class JobQueue(object):
self.logger = logging.getLogger(self.__class__.__name__)
self.__start_lock = Lock()
self.__next_peek_lock = Lock() # to protect self._next_peek & self.__tick
self.__queue_lock = RLock() # to protect self.queue
self.__tick = Event()
self.__thread = None
""":type: Thread"""
@ -69,14 +72,44 @@ class JobQueue(object):
Args:
job (telegram.ext.Job): The ``Job`` instance representing the new job
next_t (Optional[int, float, datetime.timedelta, datetime.datetime, datetime.time]):
Time in or at which the job should be executed first. Defaults to ``job.interval``.
If it is an ``int`` or a ``float``, it will be interpreted as seconds. If it is
a ``datetime.datetime``, it will be executed at the specified date and time. If it
is a ``datetime.time``, it will execute at the specified time today or, if the time
has already passed, tomorrow.
Time in or at which the job should run for the first time. This parameter will be
interpreted depending on its type.
``int`` or ``float`` will be interpreted as "seconds from now" in which the job
should run.
``datetime.timedelta`` will be interpreted as "time from now" in which the job
should run.
``datetime.datetime`` will be interpreted as a specific date and time at which the
job should run.
``datetime.time`` will be interpreted as a specific time at which the job should
run. This could be either today or, if the time has already passed, tomorrow.
"""
job.job_queue = self
warnings.warn("'JobQueue.put' is being deprecated, use 'JobQueue.one_time_job', "
"'JobQueue.repeating_job' or 'JobQueue.daily_job' instead")
self._put(job, next_t=next_t)
def _put(self, job, next_t=None, last_t=None):
"""Queue a new job.
Args:
job (telegram.ext.Job): The ``Job`` instance representing the new job
next_t (Optional[int, float, datetime.timedelta, datetime.datetime, datetime.time]):
Time in or at which the job should run for the first time. This parameter will be
interpreted depending on its type.
``int`` or ``float`` will be interpreted as "seconds from now" in which the job
should run.
``datetime.timedelta`` will be interpreted as "time from now" in which the job
should run.
``datetime.datetime`` will be interpreted as a specific date and time at which the
job should run.
``datetime.time`` will be interpreted as a specific time at which the job should
run. This could be either today or, if the time has already passed, tomorrow.
last_t (Optional[float]): Timestamp of the time when ``job`` was scheduled for in the
last ``put`` call. If provided, it will used to calculate the next timestamp more
accurately by accounting for the execution time of the job (and possibly others).
By default, the current timestamp is used.
"""
if job.job_queue is not self:
job.job_queue = self
if next_t is None:
next_t = job.interval
@ -95,20 +128,150 @@ class JobQueue(object):
if isinstance(next_t, datetime.timedelta):
next_t = next_t.total_seconds()
now = time.time()
next_t += now
next_t += last_t or time.time()
self.logger.debug('Putting job %s with t=%f', job.name, next_t)
self.queue.put((next_t, job))
with self.__queue_lock:
self.queue.put((next_t, job))
# Wake up the loop if this job should be executed next
self._set_next_peek(next_t)
def one_time_job(self, callback, when, context=None, name=None):
"""Creates a new ``Job`` that runs once and adds it to the queue.
Args:
callback (function): The callback function that should be executed by the new job. It
should take two parameters ``bot`` and ``job``, where ``job`` is the ``Job``
instance. It can be used to access it's ``context`` or change it to a repeating
job.
when (int, float, datetime.timedelta, datetime.datetime, datetime.time):
Time in or at which the job should run. This parameter will be interpreted
depending on its type.
``int`` or ``float`` will be interpreted as "seconds from now" in which the job
should run.
``datetime.timedelta`` will be interpreted as "time from now" in which the job
should run.
``datetime.datetime`` will be interpreted as a specific date and time at which the
job should run.
``datetime.time`` will be interpreted as a specific time at which the job should
run. This could be either today or, if the time has already passed, tomorrow.
context (Optional[object]): Additional data needed for the callback function. Can be
accessed through ``job.context`` in the callback. Defaults to ``None``
name (Optional[str]): The name of the new job. Defaults to ``callback.__name__``
Returns:
Job: The new ``Job`` instance that has been added to the job queue.
"""
job = Job(callback, repeat=False, context=context, name=name)
self._put(job, next_t=when)
return job
def repeating_job(self, callback, interval, first=None, context=None, name=None):
"""Creates a new ``Job`` that runs once and adds it to the queue.
Args:
callback (function): The callback function that should be executed by the new job. It
should take two parameters ``bot`` and ``job``, where ``job`` is the ``Job``
instance. It can be used to access it's ``context``, terminate the job or change
its interval.
interval (int, float, datetime.timedelta): The interval in which the job will run.
If it is an ``int`` or a ``float``, it will be interpreted as seconds.
first (int, float, datetime.timedelta, datetime.datetime, datetime.time):
Time in or at which the job should run for the first time. This parameter will be
interpreted depending on its type.
``int`` or ``float`` will be interpreted as "seconds from now" in which the job
should run.
``datetime.timedelta`` will be interpreted as "time from now" in which the job
should run.
``datetime.datetime`` will be interpreted as a specific date and time at which the
job should run.
``datetime.time`` will be interpreted as a specific time at which the job should
run. This could be either today or, if the time has already passed, tomorrow.
Defaults to ``interval``
context (Optional[object]): Additional data needed for the callback function. Can be
accessed through ``job.context`` in the callback. Defaults to ``None``
name (Optional[str]): The name of the new job. Defaults to ``callback.__name__``
Returns:
Job: The new ``Job`` instance that has been added to the job queue.
"""
job = Job(callback, interval=interval, repeat=True, context=context, name=name)
self._put(job, next_t=first)
return job
def daily_job(self, callback, time, days=Days.EVERY_DAY, context=None, name=None):
"""Creates a new ``Job`` that runs once and adds it to the queue.
Args:
callback (function): The callback function that should be executed by the new job. It
should take two parameters ``bot`` and ``job``, where ``job`` is the ``Job``
instance. It can be used to access it's ``context`` or terminate the job.
time (datetime.time): Time of day at which the job should run.
days (Optional[tuple[int]]): Defines on which days of the week the job should run.
Defaults to ``Days.EVERY_DAY``
context (Optional[object]): Additional data needed for the callback function. Can be
accessed through ``job.context`` in the callback. Defaults to ``None``
name (Optional[str]): The name of the new job. Defaults to ``callback.__name__``
Returns:
Job: The new ``Job`` instance that has been added to the job queue.
"""
job = Job(callback,
interval=datetime.timedelta(days=1),
repeat=True,
days=days,
context=context,
name=name)
self._put(job, next_t=time)
return job
def update_job_due_time(self, job, due):
"""
Changes the due time of a job.
Args:
job (Job): The job of which the due time should be changed. Must already exist in the
queue.
due (float): The new due time as a timestamp
Returns:
float: The old due time of the job as a timestamp
"""
with self.__queue_lock:
cache = list()
# Pop jobs from the priority queue one by one and check if it's the one we need.
# All other jobs are stashed away in the 'cache' list. Once the right job is found,
# it's put back into the queue with the new due time, together with all cached jobs.
try:
while True:
due_test, job_test = self.queue.get(block=False)
if job_test is job:
self.queue.put((due, job_test))
old_due = due_test
break
else:
cache.append((due_test, job_test))
except Empty:
raise ValueError("Specified 'job' doesn't exist in the job queue")
finally:
for item in reversed(cache):
self.queue.put(item)
# Make sure the queue wakes up if the updated due time requires it
self._set_next_peek(due)
return old_due
def _set_next_peek(self, t):
"""
Set next peek if not defined or `t` is before next peek.
In case the next peek was set, also trigger the `self.__tick` event.
"""
with self.__next_peek_lock:
if not self._next_peek or self._next_peek > t:
@ -120,48 +283,56 @@ class JobQueue(object):
Run all jobs that are due and re-enqueue them with their interval.
"""
now = time.time()
with self.__queue_lock:
now = time.time()
self.logger.debug('Ticking jobs with t=%f', now)
self.logger.debug('Ticking jobs with t=%f', now)
while True:
try:
t, job = self.queue.get(False)
except Empty:
break
self.logger.debug('Peeked at %s with t=%f', job.name, t)
if t > now:
# we can get here in two conditions:
# 1. At the second or later pass of the while loop, after we've already processed
# the job(s) we were supposed to at this time.
# 2. At the first iteration of the loop only if `self.put()` had triggered
# `self.__tick` because `self._next_peek` wasn't set
self.logger.debug("Next task isn't due yet. Finished!")
self.queue.put((t, job))
self._set_next_peek(t)
break
if job._remove.is_set():
self.logger.debug('Removing job %s', job.name)
continue
if job.enabled:
while True:
try:
current_week_day = datetime.datetime.now().weekday()
if any(day == current_week_day for day in job.days):
self.logger.debug('Running job %s', job.name)
job.run(self.bot)
except:
self.logger.exception('An uncaught error was raised while executing job %s',
job.name)
t, job = self.queue.get(False)
else:
self.logger.debug('Skipping disabled job %s', job.name)
except Empty:
break
if job.repeat:
self.put(job)
self.logger.debug('Peeked at %s with t=%f', job.name, t)
if t > now:
# We can get here in two conditions:
# 1. At the second or later pass of the while loop, after we've already
# processed the job(s) we were supposed to at this time.
# 2. At the first iteration of the loop only if `self.put()` had triggered
# `self.__tick` because `self._next_peek` wasn't set
self.logger.debug("Next task isn't due yet. Finished!")
self.queue.put((t, job))
self._set_next_peek(t)
break
if job._remove.is_set():
self.logger.debug('Removing job %s', job.name)
job.job_queue = None
continue
if job.enabled:
try:
current_week_day = datetime.datetime.now().weekday()
if any(day == current_week_day for day in job.days):
self.logger.debug('Running job %s', job.name)
job.run(self.bot)
except:
self.logger.exception(
'An uncaught error was raised while executing job %s', job.name)
else:
self.logger.debug('Skipping disabled job %s', job.name)
if job.repeat:
self._put(job, last_t=t)
else:
self.logger.debug('Dropping non-repeating job %s', job.name)
job.job_queue = None
def start(self):
"""
@ -216,18 +387,21 @@ class JobQueue(object):
def jobs(self):
"""Returns a tuple of all jobs that are currently in the ``JobQueue``"""
return tuple(job[1] for job in self.queue.queue if job)
with self.__queue_lock:
return tuple(job[1] for job in self.queue.queue if job)
class Job(object):
"""This class encapsulates a Job
Attributes:
callback (function):
interval (float):
days: (tuple)
repeat (bool):
name (str):
callback (function): The function that the job executes when it's due
interval (int, float, datetime.timedelta): The interval in which the job runs
days (tuple[int]): A tuple of ``int`` values that determine on which days of the week the
job runs
repeat (bool): If the job runs periodically or only once
name (str): The name of this job
job_queue (JobQueue): The ``JobQueue`` this job belongs to
enabled (bool): Boolean property that decides if this job is currently active
Args:
@ -242,39 +416,37 @@ class Job(object):
(``True``) or only once (``False``). Defaults to ``True``
context (Optional[object]): Additional data needed for the callback function. Can be
accessed through ``job.context`` in the callback. Defaults to ``None``
days (Tuple): Defines on which days the job should be ran.
days (Optional[tuple[int]]): Defines on which days of the week the job should run.
Defaults to ``Days.EVERY_DAY``
name (Optional[str]): The name of this job. Defaults to ``callback.__name__``
"""
job_queue = None
def __init__(self, callback, interval=None, repeat=True, context=None, days=Days.EVERY_DAY):
def __init__(self,
callback,
interval=None,
repeat=True,
context=None,
days=Days.EVERY_DAY,
name=None):
self.callback = callback
self.context = context
self.name = name or callback.__name__
self._repeat = repeat
self._interval = None
self.interval = interval
self.repeat = repeat
self.context = context
if not isinstance(days, tuple):
raise ValueError("The 'days argument should be of type 'tuple'")
if not all(isinstance(day, int) for day in days):
raise ValueError("The elements of the 'days' argument should be of type 'int'")
if not all(0 <= day <= 6 for day in days):
raise ValueError("The elements of the 'days' argument should be from 0 up to and "
"including 6")
if interval is None and repeat:
raise ValueError("You must either set an interval or set 'repeat' to 'False'")
if not (isinstance(interval, Number) or isinstance(interval, datetime.timedelta)):
raise ValueError("The 'interval' argument must be of type 'datetime.timedelta',"
" 'int' or 'float'")
self._days = None
self.days = days
self.name = callback.__name__
self._job_queue = None
self._remove = Event()
self._enabled = Event()
self._enabled.set()
self._immediate_run_in_progress = Event()
def run(self, bot):
"""Executes the callback function"""
@ -284,20 +456,175 @@ class Job(object):
"""
Schedules this job for removal from the ``JobQueue``. It will be removed without executing
its callback function again.
"""
self._remove.set()
def is_enabled(self):
def _wrap_callback(self, old_due_promise, keep_schedule, skip_next):
"""Wraps the callback function into two other functions and returns the result"""
original_callback = self.callback
def rescheduled(bot, job):
"""The callback function for the rescheduled run"""
# Run the original callback function
original_callback(bot, job)
# If job is non-repeating, restore the callback, ignore everything else and bail
if not self.repeat:
self.callback = original_callback
return
# Adjust the interval so that the next run is scheduled like it was before
if keep_schedule:
original_interval = self.interval
try:
old_due = old_due_promise.result(timeout=1.0)
except TimeoutError:
# Possible race condition when the JobQueue wants to run this job before it
# could run the Promise in the main thread. The JobQueue thread waits for
# the Promise to resolve, but the main thread waits for the JobQueue to release
# the __queue_lock so it can reschedule this job.
# In case that happens, simply wrap the callback function again and pretend
# like nothing happened yet.
self.callback = original_callback
self.callback = self._wrap_callback(old_due_promise, keep_schedule, skip_next)
return
self.interval = old_due - time.time()
def next_callback(bot, job):
"""The callback function for the run that was originally next"""
# Restore callback and interval
self.callback = original_callback
if keep_schedule:
self.interval = original_interval
if not skip_next:
original_callback(bot, job)
self._immediate_run_in_progress.clear()
self.callback = next_callback
return rescheduled
def run_immediately(self, keep_schedule=True, skip_next=True):
"""
Puts this job to the front of the job queue, scheduled to run immediately.
Args:
keep_schedule (Optional[bool]): If set to ``True``, which is the default, the job will
be re-scheduled so that the original schedule (as defined by the starting time and
interval) is not changed. If set to ``False``, the schedule will reset so that the
next scheduled time is calculated from the current time and the interval.
This parameter is ignored if the job is not repeating.
skip_next (Optional[bool]): If set to ``True``, which is the default, the next
scheduled execution will be skipped, effectively re-scheduling it to now.
If set to ``False``, this execution is an additional, completely unscheduled
execution.
This parameter is ignored if the job is not repeating.
Raises:
NotImplementedError: If you use this method a second time before the job either ran
the originally scheduled execution once, or has skipped it.
ValueError: If the job is disabled at the moment
"""
if self._immediate_run_in_progress.is_set():
raise NotImplementedError("You can't use 'run_immediately' again until the job has "
"returned to it's normal state. Sorry about that.")
if not self.enabled:
raise ValueError("Job is disabled")
self._immediate_run_in_progress.set()
# Wrap the old due time in a Promise
old_due_promise = Promise(
self.job_queue.update_job_due_time, # pylint: disable=no-member
[self, time.time()],
{})
# Wrap the callback in the wrapper function
self.callback = self._wrap_callback(old_due_promise, keep_schedule, skip_next)
# Run the promise to reschedule the job
old_due_promise.run()
@property
def enabled(self):
return self._enabled.is_set()
def set_enabled(self, status):
@enabled.setter
def enabled(self, status):
if status:
self._enabled.set()
else:
self._enabled.clear()
enabled = property(is_enabled, set_enabled)
@property
def interval(self):
return self._interval
@interval.setter
def interval(self, interval):
if interval is None and self.repeat:
raise ValueError("The 'interval' can not be 'None' when 'repeat' is set to 'True'")
if not (interval is None or isinstance(interval, Number) or
isinstance(interval, datetime.timedelta)):
raise ValueError("The 'interval' must be of type 'datetime.timedelta',"
" 'int' or 'float'")
self._interval = interval
@property
def interval_seconds(self):
if isinstance(self.interval, datetime.timedelta):
return self.interval.total_seconds()
else:
return self.interval
@property
def repeat(self):
return self._repeat
@repeat.setter
def repeat(self, repeat):
if self.interval is None and repeat:
raise ValueError("'repeat' can not be set to 'True' when no 'interval' is set")
self._repeat = repeat
@property
def days(self):
return self._days
@days.setter
def days(self, days):
if not isinstance(days, tuple):
raise ValueError("The 'days' argument should be of type 'tuple'")
if not all(isinstance(day, int) for day in days):
raise ValueError("The elements of the 'days' argument should be of type 'int'")
if not all(0 <= day <= 6 for day in days):
raise ValueError("The elements of the 'days' argument should be from 0 up to and "
"including 6")
self._days = days
@property
def job_queue(self):
""" :rtype: JobQueue """
return self._job_queue
@job_queue.setter
def job_queue(self, job_queue):
if not self._job_queue or not job_queue:
self._job_queue = job_queue
else:
raise ValueError("The 'job_queue' attribute can only be set once.")
def __lt__(self, other):
return False

View file

@ -178,10 +178,10 @@ class JobQueueTest(BaseTest, unittest.TestCase):
def test_time_unit_int(self):
# Testing seconds in int
seconds_interval = 2
expected_time = time.time() + seconds_interval
delta = 2
expected_time = time.time() + delta
self.jq.put(Job(self.job5, seconds_interval, repeat=False))
self.jq.put(Job(self.job5, delta, repeat=False))
sleep(2.5)
self.assertAlmostEqual(self.job_time, expected_time, delta=0.1)
@ -229,6 +229,128 @@ class JobQueueTest(BaseTest, unittest.TestCase):
self.jq.put(Job(self.job5, repeat=False), next_t=next_t)
self.assertAlmostEqual(self.jq.queue.get(False)[0], expected_time, delta=0.1)
def test_one_time_job(self):
delta = 2
expected_time = time.time() + delta
self.jq.one_time_job(self.job5, delta)
sleep(2.5)
self.assertAlmostEqual(self.job_time, expected_time, delta=0.1)
def test_repeating_job(self):
interval = 0.1
first = 1.5
self.jq.repeating_job(self.job1, interval, first=first)
sleep(2.505)
self.assertAlmostEqual(self.result, 10, delta=1)
def test_daily_job(self):
delta = 1
current_time = datetime.datetime.now().time()
time_of_day = datetime.time(current_time.hour, current_time.minute,
current_time.second + delta, current_time.microsecond)
expected_time = time.time() + 60 * 60 * 24 + delta
self.jq.daily_job(self.job1, time_of_day)
sleep(2 * delta)
self.assertEqual(self.result, 1)
self.assertAlmostEqual(self.jq.queue.get(False)[0], expected_time, delta=0.1)
def test_update_job_due_time(self):
job = self.jq.one_time_job(self.job1, datetime.datetime(2030, 1, 1))
sleep(0.5)
self.assertEqual(self.result, 0)
self.jq.update_job_due_time(job, time.time() + 1)
sleep(0.5)
self.assertEqual(self.result, 0)
sleep(1.5)
self.assertEqual(self.result, 1)
def test_job_run_immediately_one_time(self):
job = self.jq.one_time_job(self.job1, datetime.datetime(2030, 1, 1))
sleep(0.5)
self.assertEqual(self.result, 0)
job.run_immediately()
sleep(0.5)
self.assertEqual(self.result, 1)
def test_job_run_immediately_skip(self):
job = self.jq.repeating_job(self.job1, 1, first=2)
sleep(0.5) # 0.5s | no runs
self.assertEqual(self.result, 0)
job.run_immediately(keep_schedule=False, skip_next=True)
sleep(0.5) # 1s | first run at 0.5s, rescheduled with interval=1 but skipping the next run
self.assertEqual(self.result, 1)
sleep(1) # 2s | run at 1.5s was skipped
self.assertEqual(self.result, 1)
sleep(1) # 3s | run at 2.5s was back to normal
self.assertEqual(self.result, 2)
sleep(1) # 4s | just to confirm
self.assertEqual(self.result, 3)
def test_job_run_immediately_keep(self):
job = self.jq.repeating_job(self.job1, 1)
sleep(0.5) # 0.5s | no runs
self.assertEqual(self.result, 0)
job.run_immediately(keep_schedule=True, skip_next=False)
# 0.75s | first run at 0.5s, rescheduled with interval=0.5 to keep up with the schedule
sleep(0.25)
self.assertEqual(self.result, 1)
sleep(0.5) # 1.25s | run at 1s, rescheduled with interval=1
self.assertEqual(self.result, 2)
sleep(0.5) # 1.75s | last run still at 1s
self.assertEqual(self.result, 2)
sleep(1) # 2.25s | run at 2s was back to normal
self.assertEqual(self.result, 3)
sleep(1) # 3.25s | just to confirm
self.assertEqual(self.result, 4)
def test_job_run_immediately_keep_skip(self):
job = self.jq.repeating_job(self.job1, 1)
sleep(0.5) # 0.5s | no runs
self.assertEqual(self.result, 0)
job.run_immediately(keep_schedule=True, skip_next=True)
# 0.75s | first run at 0.5s, rescheduled with interval=0.5 to keep up with the schedule...
sleep(0.25)
self.assertEqual(self.result, 1)
sleep(0.5) # 1.25s | ...but run at 1s was skipped, rescheduled with interval=1
self.assertEqual(self.result, 1)
sleep(0.5) # 1.75s | last run still at 1s
self.assertEqual(self.result, 1)
sleep(1) # 2.25s | the run at 2s was back to normal
self.assertEqual(self.result, 2)
sleep(1) # 3.25s | just to confirm
self.assertEqual(self.result, 3)
if __name__ == '__main__':
unittest.main()