python-telegram-bot/telegram/ext/jobqueue.py

491 lines
19 KiB
Python
Raw Normal View History

#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015-2018
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
2016-05-26 14:01:59 +02:00
"""This module contains the classes JobQueue and Job."""
import datetime
import logging
import time
import warnings
import weakref
Job queue time units (#452) * Adding timeunit and day support to the jobqueue * Adding tests * Changed the file permission back to 644. * Changed AssertEqual argument order to (actual, expectd). * Removed the TimeUnit enum and unit param, instead use datetime.time for interval. * Removing the TimeUnits enum and unit param in favour of optionally using a datetime.time as the interval. * Removing the TimeUnits enumeration, forgot the remove it in the last one. * Removed some old docstrings refering to the TimeUnits enum. * Removed the old TimeUnits import. * Adding some error handling for the 'days' argument (only a 'tuple' with 'Days') * Writing the error message directly in the exception. * Moving a debug statement wrongfully saying a job would be running on days it wouldn't. * Writing error messages directly in the exceptions instead of making an extra variable. * Replacing datetime.time in favour of datetime.timedelta because of the get_seconds() method. * Adding error handling for the method . * Splitting the tests up in multiple ones, no float test because I haven't found a reliable way to test it. * Excluding .exrc file. * Removing \ at EOF of ValueError. * Replacing Enums with plain new-style classes. * Using numbers.number to check for ints/floats instead of seperate int/float checks. * Fixing typo, number -> Number. * Changed lower_case Days attributes to UPPER_CASE. * Different formatting for Days class, removed the get_days function in favour of a tuple. * Removed redundant function get_days. * Edited the docstring for next_t to also take datetime.timedelta. * Removed for-loop in favour of any(). * Changed docstring for interval. * Removed debug print. * Changing some docstrings. * Changing some docstrings (again).
2016-11-08 23:39:25 +01:00
from numbers import Number
from queue import PriorityQueue, Empty
from threading import Thread, Lock, Event
from telegram.ext.callbackcontext import CallbackContext
from telegram.utils.deprecate import TelegramDeprecationWarning
Job queue time units (#452) * Adding timeunit and day support to the jobqueue * Adding tests * Changed the file permission back to 644. * Changed AssertEqual argument order to (actual, expectd). * Removed the TimeUnit enum and unit param, instead use datetime.time for interval. * Removing the TimeUnits enum and unit param in favour of optionally using a datetime.time as the interval. * Removing the TimeUnits enumeration, forgot the remove it in the last one. * Removed some old docstrings refering to the TimeUnits enum. * Removed the old TimeUnits import. * Adding some error handling for the 'days' argument (only a 'tuple' with 'Days') * Writing the error message directly in the exception. * Moving a debug statement wrongfully saying a job would be running on days it wouldn't. * Writing error messages directly in the exceptions instead of making an extra variable. * Replacing datetime.time in favour of datetime.timedelta because of the get_seconds() method. * Adding error handling for the method . * Splitting the tests up in multiple ones, no float test because I haven't found a reliable way to test it. * Excluding .exrc file. * Removing \ at EOF of ValueError. * Replacing Enums with plain new-style classes. * Using numbers.number to check for ints/floats instead of seperate int/float checks. * Fixing typo, number -> Number. * Changed lower_case Days attributes to UPPER_CASE. * Different formatting for Days class, removed the get_days function in favour of a tuple. * Removed redundant function get_days. * Edited the docstring for next_t to also take datetime.timedelta. * Removed for-loop in favour of any(). * Changed docstring for interval. * Removed debug print. * Changing some docstrings. * Changing some docstrings (again).
2016-11-08 23:39:25 +01:00
class Days(object):
MON, TUE, WED, THU, FRI, SAT, SUN = range(7)
EVERY_DAY = tuple(range(7))
class JobQueue(object):
2017-09-01 08:43:08 +02:00
"""This class allows you to periodically perform tasks with the bot.
Attributes:
_queue (:obj:`PriorityQueue`): The queue that holds the Jobs.
bot (:class:`telegram.Bot`): The bot instance that should be passed to the jobs.
DEPRECATED: Use set_dispatcher instead.
"""
def __init__(self, bot=None):
self._queue = PriorityQueue()
if bot:
warnings.warn("Passing bot to jobqueue is deprecated. Please use set_dispatcher "
"instead!", TelegramDeprecationWarning, stacklevel=2)
class MockDispatcher(object):
def __init__(self):
self.bot = bot
self.use_context = False
self._dispatcher = MockDispatcher()
else:
self._dispatcher = None
self.logger = logging.getLogger(self.__class__.__name__)
self.__start_lock = Lock()
self.__next_peek_lock = Lock() # to protect self._next_peek & self.__tick
self.__tick = Event()
self.__thread = None
self._next_peek = None
self._running = False
def set_dispatcher(self, dispatcher):
self._dispatcher = dispatcher
2016-12-14 16:27:45 +01:00
def _put(self, job, next_t=None, last_t=None):
if next_t is None:
next_t = job.interval
if next_t is None:
raise ValueError('next_t is None')
Job queue time units (#452) * Adding timeunit and day support to the jobqueue * Adding tests * Changed the file permission back to 644. * Changed AssertEqual argument order to (actual, expectd). * Removed the TimeUnit enum and unit param, instead use datetime.time for interval. * Removing the TimeUnits enum and unit param in favour of optionally using a datetime.time as the interval. * Removing the TimeUnits enumeration, forgot the remove it in the last one. * Removed some old docstrings refering to the TimeUnits enum. * Removed the old TimeUnits import. * Adding some error handling for the 'days' argument (only a 'tuple' with 'Days') * Writing the error message directly in the exception. * Moving a debug statement wrongfully saying a job would be running on days it wouldn't. * Writing error messages directly in the exceptions instead of making an extra variable. * Replacing datetime.time in favour of datetime.timedelta because of the get_seconds() method. * Adding error handling for the method . * Splitting the tests up in multiple ones, no float test because I haven't found a reliable way to test it. * Excluding .exrc file. * Removing \ at EOF of ValueError. * Replacing Enums with plain new-style classes. * Using numbers.number to check for ints/floats instead of seperate int/float checks. * Fixing typo, number -> Number. * Changed lower_case Days attributes to UPPER_CASE. * Different formatting for Days class, removed the get_days function in favour of a tuple. * Removed redundant function get_days. * Edited the docstring for next_t to also take datetime.timedelta. * Removed for-loop in favour of any(). * Changed docstring for interval. * Removed debug print. * Changing some docstrings. * Changing some docstrings (again).
2016-11-08 23:39:25 +01:00
if isinstance(next_t, datetime.datetime):
next_t = (next_t - datetime.datetime.now()).total_seconds()
elif isinstance(next_t, datetime.time):
next_datetime = datetime.datetime.combine(datetime.date.today(), next_t)
if datetime.datetime.now().time() > next_t:
next_datetime += datetime.timedelta(days=1)
next_t = (next_datetime - datetime.datetime.now()).total_seconds()
elif isinstance(next_t, datetime.timedelta):
next_t = next_t.total_seconds()
2016-12-14 16:27:45 +01:00
next_t += last_t or time.time()
2016-06-21 21:20:57 +02:00
self.logger.debug('Putting job %s with t=%f', job.name, next_t)
2016-12-14 16:27:45 +01:00
self._queue.put((next_t, job))
2016-05-26 14:01:59 +02:00
# Wake up the loop if this job should be executed next
self._set_next_peek(next_t)
2016-12-14 18:01:44 +01:00
def run_once(self, callback, when, context=None, name=None):
2017-09-01 08:43:08 +02:00
"""Creates a new ``Job`` that runs once and adds it to the queue.
2016-12-14 16:27:45 +01:00
Args:
callback (:obj:`callable`): The callback function that should be executed by the new
job. It should take ``bot, job`` as parameters, where ``job`` is the
:class:`telegram.ext.Job` instance. It can be used to access its
``job.context`` or change it to a repeating job.
2018-02-19 09:36:40 +01:00
when (:obj:`int` | :obj:`float` | :obj:`datetime.timedelta` | \
:obj:`datetime.datetime` | :obj:`datetime.time`):
2016-12-14 16:27:45 +01:00
Time in or at which the job should run. This parameter will be interpreted
depending on its type.
* :obj:`int` or :obj:`float` will be interpreted as "seconds from now" in which the
job should run.
* :obj:`datetime.timedelta` will be interpreted as "time from now" in which the
job should run.
* :obj:`datetime.datetime` will be interpreted as a specific date and time at
which the job should run.
* :obj:`datetime.time` will be interpreted as a specific time of day at which the
job should run. This could be either today or, if the time has already passed,
tomorrow.
context (:obj:`object`, optional): Additional data needed for the callback function.
Can be accessed through ``job.context`` in the callback. Defaults to ``None``.
name (:obj:`str`, optional): The name of the new job. Defaults to
``callback.__name__``.
2016-12-14 16:27:45 +01:00
Returns:
:class:`telegram.ext.Job`: The new ``Job`` instance that has been added to the job
queue.
2017-09-01 08:43:08 +02:00
"""
job = Job(callback, repeat=False, context=context, name=name, job_queue=self)
2016-12-14 16:27:45 +01:00
self._put(job, next_t=when)
return job
2016-12-14 18:01:44 +01:00
def run_repeating(self, callback, interval, first=None, context=None, name=None):
"""Creates a new ``Job`` that runs at specified intervals and adds it to the queue.
2016-12-14 16:27:45 +01:00
Args:
callback (:obj:`callable`): The callback function that should be executed by the new
job. It should take ``bot, job`` as parameters, where ``job`` is the
:class:`telegram.ext.Job` instance. It can be used to access its
``Job.context`` or change it to a repeating job.
interval (:obj:`int` | :obj:`float` | :obj:`datetime.timedelta`): The interval in which
the job will run. If it is an :obj:`int` or a :obj:`float`, it will be interpreted
as seconds.
2018-02-19 09:36:40 +01:00
first (:obj:`int` | :obj:`float` | :obj:`datetime.timedelta` | \
:obj:`datetime.datetime` | :obj:`datetime.time`, optional):
Time in or at which the job should run. This parameter will be interpreted
depending on its type.
* :obj:`int` or :obj:`float` will be interpreted as "seconds from now" in which the
job should run.
* :obj:`datetime.timedelta` will be interpreted as "time from now" in which the
job should run.
* :obj:`datetime.datetime` will be interpreted as a specific date and time at
which the job should run.
* :obj:`datetime.time` will be interpreted as a specific time of day at which the
job should run. This could be either today or, if the time has already passed,
tomorrow.
2016-12-14 16:27:45 +01:00
Defaults to ``interval``
context (:obj:`object`, optional): Additional data needed for the callback function.
Can be accessed through ``job.context`` in the callback. Defaults to ``None``.
name (:obj:`str`, optional): The name of the new job. Defaults to
``callback.__name__``.
2016-12-14 16:27:45 +01:00
Returns:
:class:`telegram.ext.Job`: The new ``Job`` instance that has been added to the job
queue.
2017-09-01 08:43:08 +02:00
2016-12-14 16:27:45 +01:00
"""
job = Job(callback,
interval=interval,
repeat=True,
context=context,
name=name,
job_queue=self)
2016-12-14 16:27:45 +01:00
self._put(job, next_t=first)
return job
2016-12-14 18:01:44 +01:00
def run_daily(self, callback, time, days=Days.EVERY_DAY, context=None, name=None):
"""Creates a new ``Job`` that runs on a daily basis and adds it to the queue.
2016-12-14 16:27:45 +01:00
Args:
callback (:obj:`callable`): The callback function that should be executed by the new
job. It should take ``bot, job`` as parameters, where ``job`` is the
:class:`telegram.ext.Job` instance. It can be used to access its ``Job.context``
or change it to a repeating job.
time (:obj:`datetime.time`): Time of day at which the job should run.
days (Tuple[:obj:`int`], optional): Defines on which days of the week the job should
run. Defaults to ``EVERY_DAY``
context (:obj:`object`, optional): Additional data needed for the callback function.
Can be accessed through ``job.context`` in the callback. Defaults to ``None``.
name (:obj:`str`, optional): The name of the new job. Defaults to
``callback.__name__``.
2016-12-14 16:27:45 +01:00
Returns:
:class:`telegram.ext.Job`: The new ``Job`` instance that has been added to the job
queue.
2017-09-01 08:43:08 +02:00
2016-12-14 16:27:45 +01:00
"""
job = Job(callback,
interval=datetime.timedelta(days=1),
repeat=True,
days=days,
context=context,
name=name,
job_queue=self)
2016-12-14 16:27:45 +01:00
self._put(job, next_t=time)
return job
def _set_next_peek(self, t):
# """
# Set next peek if not defined or `t` is before next peek.
# In case the next peek was set, also trigger the `self.__tick` event.
# """
with self.__next_peek_lock:
if not self._next_peek or self._next_peek > t:
self._next_peek = t
self.__tick.set()
2016-01-05 13:32:19 +01:00
def tick(self):
2017-09-01 08:43:08 +02:00
"""Run all jobs that are due and re-enqueue them with their interval."""
now = time.time()
self.logger.debug('Ticking jobs with t=%f', now)
while True:
try:
t, job = self._queue.get(False)
except Empty:
break
self.logger.debug('Peeked at %s with t=%f', job.name, t)
if t > now:
# We can get here in two conditions:
# 1. At the second or later pass of the while loop, after we've already
# processed the job(s) we were supposed to at this time.
# 2. At the first iteration of the loop only if `self.put()` had triggered
# `self.__tick` because `self._next_peek` wasn't set
self.logger.debug("Next task isn't due yet. Finished!")
self._queue.put((t, job))
self._set_next_peek(t)
break
2016-12-20 22:37:36 +01:00
if job.removed:
self.logger.debug('Removing job %s', job.name)
continue
if job.enabled:
try:
current_week_day = datetime.datetime.now().weekday()
if any(day == current_week_day for day in job.days):
self.logger.debug('Running job %s', job.name)
job.run(self._dispatcher)
except Exception:
self.logger.exception('An uncaught error was raised while executing job %s',
job.name)
else:
self.logger.debug('Skipping disabled job %s', job.name)
2016-12-20 22:37:36 +01:00
if job.repeat and not job.removed:
self._put(job, last_t=t)
else:
self.logger.debug('Dropping non-repeating or removed job %s', job.name)
def start(self):
2017-09-01 08:43:08 +02:00
"""Starts the job_queue thread."""
self.__start_lock.acquire()
2016-05-26 14:01:59 +02:00
if not self._running:
self._running = True
self.__start_lock.release()
self.__thread = Thread(target=self._main_loop, name="job_queue")
self.__thread.start()
2016-06-21 21:20:57 +02:00
self.logger.debug('%s thread started', self.__class__.__name__)
else:
self.__start_lock.release()
def _main_loop(self):
"""
2016-05-26 14:01:59 +02:00
Thread target of thread ``job_queue``. Runs in background and performs ticks on the job
queue.
2017-09-01 08:43:08 +02:00
"""
while self._running:
# self._next_peek may be (re)scheduled during self.tick() or self.put()
with self.__next_peek_lock:
tmout = self._next_peek - time.time() if self._next_peek else None
self._next_peek = None
self.__tick.clear()
self.__tick.wait(tmout)
# If we were woken up by self.stop(), just bail out
if not self._running:
break
self.tick()
2016-06-21 21:20:57 +02:00
self.logger.debug('%s thread stopped', self.__class__.__name__)
def stop(self):
2017-09-01 08:43:08 +02:00
"""Stops the thread."""
with self.__start_lock:
self._running = False
self.__tick.set()
if self.__thread is not None:
self.__thread.join()
def jobs(self):
2017-09-01 08:43:08 +02:00
"""Returns a tuple of all jobs that are currently in the ``JobQueue``."""
with self._queue.mutex:
return tuple(job[1] for job in self._queue.queue if job)
2018-02-19 09:36:40 +01:00
def get_jobs_by_name(self, name):
"""Returns a tuple of jobs with the given name that are currently in the ``JobQueue``"""
with self._queue.mutex:
return tuple(job[1] for job in self._queue.queue if job and job[1].name == name)
class Job(object):
2017-09-01 08:43:08 +02:00
"""This class encapsulates a Job.
Attributes:
callback (:obj:`callable`): The callback function that should be executed by the new job.
context (:obj:`object`): Optional. Additional data needed for the callback function.
name (:obj:`str`): Optional. The name of the new job.
Args:
callback (:obj:`callable`): The callback function that should be executed by the new job.
It should take ``bot, job`` as parameters, where ``job`` is the
:class:`telegram.ext.Job` instance. It can be used to access it's :attr:`context`
or change it to a repeating job.
interval (:obj:`int` | :obj:`float` | :obj:`datetime.timedelta`, optional): The interval in
which the job will run. If it is an :obj:`int` or a :obj:`float`, it will be
interpreted as seconds. If you don't set this value, you must set :attr:`repeat` to
``False`` and specify :attr:`next_t` when you put the job into the job queue.
repeat (:obj:`bool`, optional): If this job should be periodically execute its callback
function (``True``) or only once (``False``). Defaults to ``True``.
context (:obj:`object`, optional): Additional data needed for the callback function. Can be
accessed through ``job.context`` in the callback. Defaults to ``None``.
name (:obj:`str`, optional): The name of the new job. Defaults to ``callback.__name__``.
days (Tuple[:obj:`int`], optional): Defines on which days of the week the job should run.
2016-12-14 16:27:45 +01:00
Defaults to ``Days.EVERY_DAY``
job_queue (:class:`telegram.ext.JobQueue`, optional): The ``JobQueue`` this job belongs to.
Only optional for backward compatibility with ``JobQueue.put()``.
2017-09-01 08:43:08 +02:00
"""
2016-12-14 16:27:45 +01:00
def __init__(self,
callback,
interval=None,
repeat=True,
context=None,
days=Days.EVERY_DAY,
name=None,
job_queue=None):
2016-12-14 16:27:45 +01:00
self.callback = callback
self.context = context
2016-12-14 16:27:45 +01:00
self.name = name or callback.__name__
2016-12-14 16:27:45 +01:00
self._repeat = repeat
self._interval = None
self.interval = interval
self.repeat = repeat
Job queue time units (#452) * Adding timeunit and day support to the jobqueue * Adding tests * Changed the file permission back to 644. * Changed AssertEqual argument order to (actual, expectd). * Removed the TimeUnit enum and unit param, instead use datetime.time for interval. * Removing the TimeUnits enum and unit param in favour of optionally using a datetime.time as the interval. * Removing the TimeUnits enumeration, forgot the remove it in the last one. * Removed some old docstrings refering to the TimeUnits enum. * Removed the old TimeUnits import. * Adding some error handling for the 'days' argument (only a 'tuple' with 'Days') * Writing the error message directly in the exception. * Moving a debug statement wrongfully saying a job would be running on days it wouldn't. * Writing error messages directly in the exceptions instead of making an extra variable. * Replacing datetime.time in favour of datetime.timedelta because of the get_seconds() method. * Adding error handling for the method . * Splitting the tests up in multiple ones, no float test because I haven't found a reliable way to test it. * Excluding .exrc file. * Removing \ at EOF of ValueError. * Replacing Enums with plain new-style classes. * Using numbers.number to check for ints/floats instead of seperate int/float checks. * Fixing typo, number -> Number. * Changed lower_case Days attributes to UPPER_CASE. * Different formatting for Days class, removed the get_days function in favour of a tuple. * Removed redundant function get_days. * Edited the docstring for next_t to also take datetime.timedelta. * Removed for-loop in favour of any(). * Changed docstring for interval. * Removed debug print. * Changing some docstrings. * Changing some docstrings (again).
2016-11-08 23:39:25 +01:00
2016-12-14 16:27:45 +01:00
self._days = None
self.days = days
self._job_queue = weakref.proxy(job_queue) if job_queue is not None else None
self._remove = Event()
self._enabled = Event()
self._enabled.set()
def run(self, dispatcher):
2017-09-01 08:43:08 +02:00
"""Executes the callback function."""
if dispatcher.use_context:
self.callback(CallbackContext.from_job(self, dispatcher))
else:
self.callback(dispatcher.bot, self)
def schedule_removal(self):
"""
Schedules this job for removal from the ``JobQueue``. It will be removed without executing
its callback function again.
2017-09-01 08:43:08 +02:00
"""
self._remove.set()
@property
2016-12-20 22:37:36 +01:00
def removed(self):
2017-09-01 08:43:08 +02:00
""":obj:`bool`: Whether this job is due to be removed."""
return self._remove.is_set()
2016-12-14 16:27:45 +01:00
@property
def enabled(self):
2017-09-01 08:43:08 +02:00
""":obj:`bool`: Whether this job is enabled."""
return self._enabled.is_set()
2016-12-14 16:27:45 +01:00
@enabled.setter
def enabled(self, status):
if status:
self._enabled.set()
else:
self._enabled.clear()
2016-12-14 16:27:45 +01:00
@property
def interval(self):
"""
:obj:`int` | :obj:`float` | :obj:`datetime.timedelta`: Optional. The interval in which the
job will run.
2017-09-01 08:43:08 +02:00
"""
2016-12-14 16:27:45 +01:00
return self._interval
@interval.setter
def interval(self, interval):
if interval is None and self.repeat:
raise ValueError("The 'interval' can not be 'None' when 'repeat' is set to 'True'")
if not (interval is None or isinstance(interval, (Number, datetime.timedelta))):
2016-12-14 16:27:45 +01:00
raise ValueError("The 'interval' must be of type 'datetime.timedelta',"
" 'int' or 'float'")
self._interval = interval
@property
def interval_seconds(self):
2017-09-01 08:43:08 +02:00
""":obj:`int`: The interval for this job in seconds."""
interval = self.interval
if isinstance(interval, datetime.timedelta):
return interval.total_seconds()
2016-12-14 16:27:45 +01:00
else:
return interval
2016-12-14 16:27:45 +01:00
@property
def repeat(self):
2017-09-01 08:43:08 +02:00
""":obj:`bool`: Optional. If this job should periodically execute its callback function."""
2016-12-14 16:27:45 +01:00
return self._repeat
@repeat.setter
def repeat(self, repeat):
if self.interval is None and repeat:
raise ValueError("'repeat' can not be set to 'True' when no 'interval' is set")
self._repeat = repeat
@property
def days(self):
2017-09-01 08:43:08 +02:00
"""Tuple[:obj:`int`]: Optional. Defines on which days of the week the job should run."""
2016-12-14 16:27:45 +01:00
return self._days
@days.setter
def days(self, days):
if not isinstance(days, tuple):
raise ValueError("The 'days' argument should be of type 'tuple'")
if not all(isinstance(day, int) for day in days):
raise ValueError("The elements of the 'days' argument should be of type 'int'")
if not all(0 <= day <= 6 for day in days):
raise ValueError("The elements of the 'days' argument should be from 0 up to and "
"including 6")
self._days = days
@property
def job_queue(self):
2017-09-01 08:43:08 +02:00
""":class:`telegram.ext.JobQueue`: Optional. The ``JobQueue`` this job belongs to."""
2016-12-14 16:27:45 +01:00
return self._job_queue
@job_queue.setter
def job_queue(self, job_queue):
# Property setter for backward compatibility with JobQueue.put()
if not self._job_queue:
self._job_queue = weakref.proxy(job_queue)
2016-12-14 16:27:45 +01:00
else:
raise RuntimeError("The 'job_queue' attribute can only be set once.")
def __lt__(self, other):
return False