removed Job.run_immediately and related code

This commit is contained in:
Jannes Höke 2016-12-14 17:15:52 +01:00
parent 09ddc1b1a8
commit d5ce32c672
2 changed files with 45 additions and 279 deletions

View file

@ -23,11 +23,9 @@ import time
import warnings
import datetime
from numbers import Number
from threading import Thread, Lock, RLock, Event
from threading import Thread, Lock, Event
from queue import PriorityQueue, Empty
from telegram.utils.promise import Promise
class Days(object):
MON, TUE, WED, THU, FRI, SAT, SUN = range(7)
@ -58,7 +56,6 @@ class JobQueue(object):
self.logger = logging.getLogger(self.__class__.__name__)
self.__start_lock = Lock()
self.__next_peek_lock = Lock() # to protect self._next_peek & self.__tick
self.__queue_lock = RLock() # to protect self.queue
self.__tick = Event()
self.__thread = None
""":type: Thread"""
@ -132,8 +129,7 @@ class JobQueue(object):
self.logger.debug('Putting job %s with t=%f', job.name, next_t)
with self.__queue_lock:
self.queue.put((next_t, job))
self.queue.put((next_t, job))
# Wake up the loop if this job should be executed next
self._set_next_peek(next_t)
@ -227,47 +223,6 @@ class JobQueue(object):
self._put(job, next_t=time)
return job
def update_job_due_time(self, job, due):
"""
Changes the due time of a job.
Args:
job (Job): The job of which the due time should be changed. Must already exist in the
queue.
due (float): The new due time as a timestamp
Returns:
float: The old due time of the job as a timestamp
"""
with self.__queue_lock:
cache = list()
# Pop jobs from the priority queue one by one and check if it's the one we need.
# All other jobs are stashed away in the 'cache' list. Once the right job is found,
# it's put back into the queue with the new due time, together with all cached jobs.
try:
while True:
due_test, job_test = self.queue.get(block=False)
if job_test is job:
self.queue.put((due, job_test))
old_due = due_test
break
else:
cache.append((due_test, job_test))
except Empty:
raise ValueError("Specified 'job' doesn't exist in the job queue")
finally:
for item in reversed(cache):
self.queue.put(item)
# Make sure the queue wakes up if the updated due time requires it
self._set_next_peek(due)
return old_due
def _set_next_peek(self, t):
"""
Set next peek if not defined or `t` is before next peek.
@ -283,56 +238,55 @@ class JobQueue(object):
Run all jobs that are due and re-enqueue them with their interval.
"""
with self.__queue_lock:
now = time.time()
now = time.time()
self.logger.debug('Ticking jobs with t=%f', now)
self.logger.debug('Ticking jobs with t=%f', now)
while True:
while True:
try:
t, job = self.queue.get(False)
except Empty:
break
self.logger.debug('Peeked at %s with t=%f', job.name, t)
if t > now:
# We can get here in two conditions:
# 1. At the second or later pass of the while loop, after we've already
# processed the job(s) we were supposed to at this time.
# 2. At the first iteration of the loop only if `self.put()` had triggered
# `self.__tick` because `self._next_peek` wasn't set
self.logger.debug("Next task isn't due yet. Finished!")
self.queue.put((t, job))
self._set_next_peek(t)
break
if job._remove.is_set():
self.logger.debug('Removing job %s', job.name)
job.job_queue = None
continue
if job.enabled:
try:
t, job = self.queue.get(False)
current_week_day = datetime.datetime.now().weekday()
if any(day == current_week_day for day in job.days):
self.logger.debug('Running job %s', job.name)
job.run(self.bot)
except Empty:
break
except:
self.logger.exception('An uncaught error was raised while executing job %s',
job.name)
self.logger.debug('Peeked at %s with t=%f', job.name, t)
else:
self.logger.debug('Skipping disabled job %s', job.name)
if t > now:
# We can get here in two conditions:
# 1. At the second or later pass of the while loop, after we've already
# processed the job(s) we were supposed to at this time.
# 2. At the first iteration of the loop only if `self.put()` had triggered
# `self.__tick` because `self._next_peek` wasn't set
self.logger.debug("Next task isn't due yet. Finished!")
self.queue.put((t, job))
self._set_next_peek(t)
break
if job.repeat:
self._put(job, last_t=t)
if job._remove.is_set():
self.logger.debug('Removing job %s', job.name)
job.job_queue = None
continue
if job.enabled:
try:
current_week_day = datetime.datetime.now().weekday()
if any(day == current_week_day for day in job.days):
self.logger.debug('Running job %s', job.name)
job.run(self.bot)
except:
self.logger.exception(
'An uncaught error was raised while executing job %s', job.name)
else:
self.logger.debug('Skipping disabled job %s', job.name)
if job.repeat:
self._put(job, last_t=t)
else:
self.logger.debug('Dropping non-repeating job %s', job.name)
job.job_queue = None
else:
self.logger.debug('Dropping non-repeating job %s', job.name)
job.job_queue = None
def start(self):
"""
@ -387,8 +341,7 @@ class JobQueue(object):
def jobs(self):
"""Returns a tuple of all jobs that are currently in the ``JobQueue``"""
with self.__queue_lock:
return tuple(job[1] for job in self.queue.queue if job)
return tuple(job[1] for job in self.queue.queue if job)
class Job(object):
@ -446,7 +399,6 @@ class Job(object):
self._remove = Event()
self._enabled = Event()
self._enabled.set()
self._immediate_run_in_progress = Event()
def run(self, bot):
"""Executes the callback function"""
@ -459,99 +411,6 @@ class Job(object):
"""
self._remove.set()
def _wrap_callback(self, old_due_promise, keep_schedule, skip_next):
"""Wraps the callback function into two other functions and returns the result"""
original_callback = self.callback
def rescheduled(bot, job):
"""The callback function for the rescheduled run"""
# Run the original callback function
original_callback(bot, job)
# If job is non-repeating, restore the callback, ignore everything else and bail
if not self.repeat:
self.callback = original_callback
return
# Adjust the interval so that the next run is scheduled like it was before
if keep_schedule:
original_interval = self.interval
try:
old_due = old_due_promise.result(timeout=1.0)
except TimeoutError:
# Possible race condition when the JobQueue wants to run this job before it
# could run the Promise in the main thread. The JobQueue thread waits for
# the Promise to resolve, but the main thread waits for the JobQueue to release
# the __queue_lock so it can reschedule this job.
# In case that happens, simply wrap the callback function again and pretend
# like nothing happened yet.
self.callback = original_callback
self.callback = self._wrap_callback(old_due_promise, keep_schedule, skip_next)
return
self.interval = old_due - time.time()
def next_callback(bot, job):
"""The callback function for the run that was originally next"""
# Restore callback and interval
self.callback = original_callback
if keep_schedule:
self.interval = original_interval
if not skip_next:
original_callback(bot, job)
self._immediate_run_in_progress.clear()
self.callback = next_callback
return rescheduled
def run_immediately(self, keep_schedule=True, skip_next=True):
"""
Puts this job to the front of the job queue, scheduled to run immediately.
Args:
keep_schedule (Optional[bool]): If set to ``True``, which is the default, the job will
be re-scheduled so that the original schedule (as defined by the starting time and
interval) is not changed. If set to ``False``, the schedule will reset so that the
next scheduled time is calculated from the current time and the interval.
This parameter is ignored if the job is not repeating.
skip_next (Optional[bool]): If set to ``True``, which is the default, the next
scheduled execution will be skipped, effectively re-scheduling it to now.
If set to ``False``, this execution is an additional, completely unscheduled
execution.
This parameter is ignored if the job is not repeating.
Raises:
NotImplementedError: If you use this method a second time before the job either ran
the originally scheduled execution once, or has skipped it.
ValueError: If the job is disabled at the moment
"""
if self._immediate_run_in_progress.is_set():
raise NotImplementedError("You can't use 'run_immediately' again until the job has "
"returned to it's normal state. Sorry about that.")
if not self.enabled:
raise ValueError("Job is disabled")
self._immediate_run_in_progress.set()
# Wrap the old due time in a Promise
old_due_promise = Promise(
self.job_queue.update_job_due_time, # pylint: disable=no-member
[self, time.time()],
{})
# Wrap the callback in the wrapper function
self.callback = self._wrap_callback(old_due_promise, keep_schedule, skip_next)
# Run the promise to reschedule the job
old_due_promise.run()
@property
def enabled(self):
return self._enabled.is_set()

View file

@ -258,99 +258,6 @@ class JobQueueTest(BaseTest, unittest.TestCase):
self.assertEqual(self.result, 1)
self.assertAlmostEqual(self.jq.queue.get(False)[0], expected_time, delta=0.1)
def test_update_job_due_time(self):
job = self.jq.one_time_job(self.job1, datetime.datetime(2030, 1, 1))
sleep(0.5)
self.assertEqual(self.result, 0)
self.jq.update_job_due_time(job, time.time() + 1)
sleep(0.5)
self.assertEqual(self.result, 0)
sleep(1.5)
self.assertEqual(self.result, 1)
def test_job_run_immediately_one_time(self):
job = self.jq.one_time_job(self.job1, datetime.datetime(2030, 1, 1))
sleep(0.5)
self.assertEqual(self.result, 0)
job.run_immediately()
sleep(0.5)
self.assertEqual(self.result, 1)
def test_job_run_immediately_skip(self):
job = self.jq.repeating_job(self.job1, 1, first=2)
sleep(0.5) # 0.5s | no runs
self.assertEqual(self.result, 0)
job.run_immediately(keep_schedule=False, skip_next=True)
sleep(0.5) # 1s | first run at 0.5s, rescheduled with interval=1 but skipping the next run
self.assertEqual(self.result, 1)
sleep(1) # 2s | run at 1.5s was skipped
self.assertEqual(self.result, 1)
sleep(1) # 3s | run at 2.5s was back to normal
self.assertEqual(self.result, 2)
sleep(1) # 4s | just to confirm
self.assertEqual(self.result, 3)
def test_job_run_immediately_keep(self):
job = self.jq.repeating_job(self.job1, 1)
sleep(0.5) # 0.5s | no runs
self.assertEqual(self.result, 0)
job.run_immediately(keep_schedule=True, skip_next=False)
# 0.75s | first run at 0.5s, rescheduled with interval=0.5 to keep up with the schedule
sleep(0.25)
self.assertEqual(self.result, 1)
sleep(0.5) # 1.25s | run at 1s, rescheduled with interval=1
self.assertEqual(self.result, 2)
sleep(0.5) # 1.75s | last run still at 1s
self.assertEqual(self.result, 2)
sleep(1) # 2.25s | run at 2s was back to normal
self.assertEqual(self.result, 3)
sleep(1) # 3.25s | just to confirm
self.assertEqual(self.result, 4)
def test_job_run_immediately_keep_skip(self):
job = self.jq.repeating_job(self.job1, 1)
sleep(0.5) # 0.5s | no runs
self.assertEqual(self.result, 0)
job.run_immediately(keep_schedule=True, skip_next=True)
# 0.75s | first run at 0.5s, rescheduled with interval=0.5 to keep up with the schedule...
sleep(0.25)
self.assertEqual(self.result, 1)
sleep(0.5) # 1.25s | ...but run at 1s was skipped, rescheduled with interval=1
self.assertEqual(self.result, 1)
sleep(0.5) # 1.75s | last run still at 1s
self.assertEqual(self.result, 1)
sleep(1) # 2.25s | the run at 2s was back to normal
self.assertEqual(self.result, 2)
sleep(1) # 3.25s | just to confirm
self.assertEqual(self.result, 3)
if __name__ == '__main__':
unittest.main()