jobqueue.py: stability improvments

- Job.job_queue is now weakref.proxy reducing the risk of cyclic
   pointers preventing Job object from being deleted.
 - JobQueue._put(): raise if both next_t and job.interval are None
 - Don't put repeating job back to queue if user had disabled it was
   disabled during the time of execution.
 - New method: Job.is_removed() - promising a consistent API (instead of
   access to private member Job._remove)
 - Documentation fixes.
This commit is contained in:
Noam Meltzer 2016-12-15 00:08:03 +02:00
parent cbf93e1046
commit 93bf21a0a4

View file

@ -22,6 +22,7 @@ import logging
import time
import warnings
import datetime
import weakref
from numbers import Number
from threading import Thread, Lock, Event
from queue import PriorityQueue, Empty
@ -82,6 +83,8 @@ class JobQueue(object):
"""
warnings.warn("'JobQueue.put' is being deprecated, use 'JobQueue.one_time_job', "
"'JobQueue.repeating_job' or 'JobQueue.daily_job' instead")
if job.job_queue is None:
job.job_queue = self
self._put(job, next_t=next_t)
def _put(self, job, next_t=None, last_t=None):
@ -92,27 +95,29 @@ class JobQueue(object):
next_t (Optional[int, float, datetime.timedelta, datetime.datetime, datetime.time]):
Time in or at which the job should run for the first time. This parameter will be
interpreted depending on its type.
``int`` or ``float`` will be interpreted as "seconds from now" in which the job
should run.
``datetime.timedelta`` will be interpreted as "time from now" in which the job
should run.
``datetime.datetime`` will be interpreted as a specific date and time at which the
job should run.
``datetime.time`` will be interpreted as a specific time at which the job should
run. This could be either today or, if the time has already passed, tomorrow.
last_t (Optional[float]): Timestamp of the time when ``job`` was scheduled for in the
last ``put`` call. If provided, it will used to calculate the next timestamp more
accurately by accounting for the execution time of the job (and possibly others).
By default, the current timestamp is used.
"""
if job.job_queue is not self:
job.job_queue = self
* ``int`` or ``float`` will be interpreted as "seconds from now" in which the job
should run.
* ``datetime.timedelta`` will be interpreted as "time from now" in which the job
should run.
* ``datetime.datetime`` will be interpreted as a specific date and time at which
the job should run.
* ``datetime.time`` will be interpreted as a specific time of day at which the job
should run. This could be either today or, if the time has already passed,
tomorrow.
last_t (Optional[float]): Timestamp of the time when ``job`` was scheduled for in the
last ``put`` call. If provided, it will be used to calculate the next timestamp
more accurately by accounting for the execution time of the job (and possibly
others). If None, `now` will be assumed.
"""
if next_t is None:
next_t = job.interval
if next_t is None:
raise ValueError('next_t is None')
if isinstance(next_t, datetime.datetime):
next_t = next_t - datetime.datetime.now()
next_t = (next_t - datetime.datetime.now()).total_seconds()
elif isinstance(next_t, datetime.time):
next_datetime = datetime.datetime.combine(datetime.date.today(), next_t)
@ -120,9 +125,9 @@ class JobQueue(object):
if datetime.datetime.now().time() > next_t:
next_datetime += datetime.timedelta(days=1)
next_t = next_datetime - datetime.datetime.now()
next_t = (next_datetime - datetime.datetime.now()).total_seconds()
if isinstance(next_t, datetime.timedelta):
elif isinstance(next_t, datetime.timedelta):
next_t = next_t.total_seconds()
next_t += last_t or time.time()
@ -145,22 +150,26 @@ class JobQueue(object):
when (int, float, datetime.timedelta, datetime.datetime, datetime.time):
Time in or at which the job should run. This parameter will be interpreted
depending on its type.
``int`` or ``float`` will be interpreted as "seconds from now" in which the job
should run.
``datetime.timedelta`` will be interpreted as "time from now" in which the job
should run.
``datetime.datetime`` will be interpreted as a specific date and time at which the
job should run.
``datetime.time`` will be interpreted as a specific time at which the job should
run. This could be either today or, if the time has already passed, tomorrow.
* ``int`` or ``float`` will be interpreted as "seconds from now" in which the job
should run.
* ``datetime.timedelta`` will be interpreted as "time from now" in which the job
should run.
* ``datetime.datetime`` will be interpreted as a specific date and time at which
the job should run.
* ``datetime.time`` will be interpreted as a specific time of day at which the job
should run. This could be either today or, if the time has already passed,
tomorrow.
context (Optional[object]): Additional data needed for the callback function. Can be
accessed through ``job.context`` in the callback. Defaults to ``None``
name (Optional[str]): The name of the new job. Defaults to ``callback.__name__``
Returns:
Job: The new ``Job`` instance that has been added to the job queue.
"""
job = Job(callback, repeat=False, context=context, name=name)
job = Job(callback, repeat=False, context=context, name=name, job_queue=self)
self._put(job, next_t=when)
return job
@ -175,16 +184,17 @@ class JobQueue(object):
interval (int, float, datetime.timedelta): The interval in which the job will run.
If it is an ``int`` or a ``float``, it will be interpreted as seconds.
first (int, float, datetime.timedelta, datetime.datetime, datetime.time):
Time in or at which the job should run for the first time. This parameter will be
interpreted depending on its type.
``int`` or ``float`` will be interpreted as "seconds from now" in which the job
should run.
``datetime.timedelta`` will be interpreted as "time from now" in which the job
should run.
``datetime.datetime`` will be interpreted as a specific date and time at which the
job should run.
``datetime.time`` will be interpreted as a specific time at which the job should
run. This could be either today or, if the time has already passed, tomorrow.
* ``int`` or ``float`` will be interpreted as "seconds from now" in which the job
should run.
* ``datetime.timedelta`` will be interpreted as "time from now" in which the job
should run.
* ``datetime.datetime`` will be interpreted as a specific date and time at which
the job should run.
* ``datetime.time`` will be interpreted as a specific time of day at which the job
should run. This could be either today or, if the time has already passed,
tomorrow.
Defaults to ``interval``
context (Optional[object]): Additional data needed for the callback function. Can be
accessed through ``job.context`` in the callback. Defaults to ``None``
@ -192,8 +202,14 @@ class JobQueue(object):
Returns:
Job: The new ``Job`` instance that has been added to the job queue.
"""
job = Job(callback, interval=interval, repeat=True, context=context, name=name)
job = Job(callback,
interval=interval,
repeat=True,
context=context,
name=name,
job_queue=self)
self._put(job, next_t=first)
return job
@ -213,13 +229,15 @@ class JobQueue(object):
Returns:
Job: The new ``Job`` instance that has been added to the job queue.
"""
job = Job(callback,
interval=datetime.timedelta(days=1),
repeat=True,
days=days,
context=context,
name=name)
name=name,
job_queue=self)
self._put(job, next_t=time)
return job
@ -245,7 +263,6 @@ class JobQueue(object):
while True:
try:
t, job = self.queue.get(False)
except Empty:
break
@ -262,9 +279,8 @@ class JobQueue(object):
self._set_next_peek(t)
break
if job._remove.is_set():
if job.is_removed():
self.logger.debug('Removing job %s', job.name)
job.job_queue = None
continue
if job.enabled:
@ -277,16 +293,13 @@ class JobQueue(object):
except:
self.logger.exception('An uncaught error was raised while executing job %s',
job.name)
else:
self.logger.debug('Skipping disabled job %s', job.name)
if job.repeat:
if job.repeat and not job.is_removed():
self._put(job, last_t=t)
else:
self.logger.debug('Dropping non-repeating job %s', job.name)
job.job_queue = None
self.logger.debug('Dropping non-repeating or removed job %s', job.name)
def start(self):
"""
@ -301,7 +314,6 @@ class JobQueue(object):
self.__thread = Thread(target=self._main_loop, name="job_queue")
self.__thread.start()
self.logger.debug('%s thread started', self.__class__.__name__)
else:
self.__start_lock.release()
@ -314,7 +326,7 @@ class JobQueue(object):
while self._running:
# self._next_peek may be (re)scheduled during self.tick() or self.put()
with self.__next_peek_lock:
tmout = self._next_peek and self._next_peek - time.time()
tmout = self._next_peek - time.time() if self._next_peek else None
self._next_peek = None
self.__tick.clear()
@ -372,6 +384,9 @@ class Job(object):
days (Optional[tuple[int]]): Defines on which days of the week the job should run.
Defaults to ``Days.EVERY_DAY``
name (Optional[str]): The name of this job. Defaults to ``callback.__name__``
job_queue (Optional[class:`telegram.ext.JobQueue`]): The ``JobQueue`` this job belongs to.
Only optional for backward compatibility with ``JobQueue.put()``.
"""
def __init__(self,
@ -380,7 +395,8 @@ class Job(object):
repeat=True,
context=None,
days=Days.EVERY_DAY,
name=None):
name=None,
job_queue=None):
self.callback = callback
self.context = context
@ -394,7 +410,7 @@ class Job(object):
self._days = None
self.days = days
self._job_queue = None
self._job_queue = weakref.proxy(job_queue) if job_queue is not None else None
self._remove = Event()
self._enabled = Event()
@ -411,6 +427,9 @@ class Job(object):
"""
self._remove.set()
def is_removed(self):
return self._remove.is_set()
@property
def enabled(self):
return self._enabled.is_set()
@ -431,8 +450,7 @@ class Job(object):
if interval is None and self.repeat:
raise ValueError("The 'interval' can not be 'None' when 'repeat' is set to 'True'")
if not (interval is None or isinstance(interval, Number) or
isinstance(interval, datetime.timedelta)):
if not (interval is None or isinstance(interval, (Number, datetime.timedelta))):
raise ValueError("The 'interval' must be of type 'datetime.timedelta',"
" 'int' or 'float'")
@ -480,10 +498,11 @@ class Job(object):
@job_queue.setter
def job_queue(self, job_queue):
if not self._job_queue or not job_queue:
self._job_queue = job_queue
# Property setter for backward compatibility with JobQueue.put()
if not self._job_queue:
self._job_queue = weakref.proxy(job_queue)
else:
raise ValueError("The 'job_queue' attribute can only be set once.")
raise RuntimeError("The 'job_queue' attribute can only be set once.")
def __lt__(self, other):
return False