Refactor JobQueue (#1981)

* First go on refactoring JobQueue

* Temporarily enable tests for the v13 branch

* Work on tests

* Temporarily enable tests for the v13 branch

* Increase coverage

* Remove JobQueue.tick()

* Address review

* Temporarily enable tests for the v13 branch

* Address review

* Dispatch errors

* Fix handling of job_kwargs

* Remove possibility to pass a Bot to JobQueue
This commit is contained in:
Bibo-Joshi 2020-07-10 13:11:28 +02:00
parent 3930072659
commit 19a4f9e53a
12 changed files with 530 additions and 711 deletions

View file

@ -15,7 +15,7 @@ jobs:
runs-on: ${{matrix.os}}
strategy:
matrix:
python-version: [3.5, 3.6, 3.7, 3.8]
python-version: [3.6, 3.7, 3.8]
os: [ubuntu-latest, windows-latest]
include:
- os: ubuntu-latest

View file

@ -83,7 +83,7 @@ Introduction
This library provides a pure Python interface for the
`Telegram Bot API <https://core.telegram.org/bots/api>`_.
It's compatible with Python versions 3.5+. PTB might also work on `PyPy <http://pypy.org/>`_, though there have been a lot of issues before. Hence, PyPy is not officially supported.
It's compatible with Python versions 3.6+. PTB might also work on `PyPy <http://pypy.org/>`_, though there have been a lot of issues before. Hence, PyPy is not officially supported.
In addition to the pure API implementation, this library features a number of high-level classes to
make the development of bots easy and straightforward. These classes are contained in the

View file

@ -2,3 +2,4 @@ certifi
tornado>=5.1
cryptography
decorator>=4.4.0
APScheduler==3.6.3

View file

@ -61,7 +61,6 @@ with codecs.open('README.rst', 'r', 'utf-8') as fd:
'Topic :: Internet',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',

View file

@ -18,19 +18,16 @@
# along with this program. If not, see [http://www.gnu.org/licenses/].
"""This module contains the classes JobQueue and Job."""
import calendar
import datetime
import logging
import time
import warnings
import weakref
from numbers import Number
from queue import PriorityQueue, Empty
from threading import Thread, Lock, Event
import pytz
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.combining import OrTrigger
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from telegram.ext.callbackcontext import CallbackContext
from telegram.utils.deprecate import TelegramDeprecationWarning
from telegram.utils.helpers import to_float_timestamp
class Days:
@ -39,36 +36,66 @@ class Days:
class JobQueue:
"""This class allows you to periodically perform tasks with the bot.
"""This class allows you to periodically perform tasks with the bot. It is a convenience
wrapper for the APScheduler library.
Attributes:
_queue (:obj:`PriorityQueue`): The queue that holds the Jobs.
scheduler (:class:`apscheduler.schedulers.background.BackgroundScheduler`): The APScheduler
bot (:class:`telegram.Bot`): The bot instance that should be passed to the jobs.
DEPRECATED: Use :attr:`set_dispatcher` instead.
"""
def __init__(self, bot=None):
self._queue = PriorityQueue()
if bot:
warnings.warn("Passing bot to jobqueue is deprecated. Please use set_dispatcher "
"instead!", TelegramDeprecationWarning, stacklevel=2)
class MockDispatcher:
def __init__(self):
self.bot = bot
self.use_context = False
self._dispatcher = MockDispatcher()
else:
self._dispatcher = None
def __init__(self):
self._dispatcher = None
self.logger = logging.getLogger(self.__class__.__name__)
self.__start_lock = Lock()
self.__next_peek_lock = Lock() # to protect self._next_peek & self.__tick
self.__tick = Event()
self.__thread = None
self._next_peek = None
self._running = False
self.scheduler = BackgroundScheduler(timezone=pytz.utc)
self.scheduler.add_listener(self._update_persistence,
mask=EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# Dispatch errors and don't log them in the APS logger
def aps_log_filter(record):
return 'raised an exception' not in record.msg
logging.getLogger('apscheduler.executors.default').addFilter(aps_log_filter)
self.scheduler.add_listener(self._dispatch_error, EVENT_JOB_ERROR)
def _build_args(self, job):
if self._dispatcher.use_context:
return [CallbackContext.from_job(job, self._dispatcher)]
return [self._dispatcher.bot, job]
def _tz_now(self):
return datetime.datetime.now(self.scheduler.timezone)
def _update_persistence(self, event):
self._dispatcher.update_persistence()
def _dispatch_error(self, event):
try:
self._dispatcher.dispatch_error(None, event.exception)
# Errors should not stop the thread.
except Exception:
self.logger.exception('An error was raised while processing the job and an '
'uncaught error was raised while handling the error '
'with an error_handler.')
def _parse_time_input(self, time, shift_day=False):
if time is None:
return None
if isinstance(time, (int, float)):
return self._tz_now() + datetime.timedelta(seconds=time)
if isinstance(time, datetime.timedelta):
return self._tz_now() + time
if isinstance(time, datetime.time):
dt = datetime.datetime.combine(
datetime.datetime.now(tz=time.tzinfo or self.scheduler.timezone).date(), time,
tzinfo=time.tzinfo or self.scheduler.timezone)
if shift_day and dt <= datetime.datetime.now(pytz.utc):
dt += datetime.timedelta(days=1)
return dt
# isinstance(time, datetime.datetime):
return time
def set_dispatcher(self, dispatcher):
"""Set the dispatcher to be used by this JobQueue. Use this instead of passing a
@ -80,37 +107,7 @@ class JobQueue:
"""
self._dispatcher = dispatcher
def _put(self, job, time_spec=None, previous_t=None):
"""
Enqueues the job, scheduling its next run at the correct time.
Args:
job (telegram.ext.Job): job to enqueue
time_spec (optional):
Specification of the time for which the job should be scheduled. The precise
semantics of this parameter depend on its type (see
:func:`telegram.ext.JobQueue.run_repeating` for details).
Defaults to now + ``job.interval``.
previous_t (optional):
Time at which the job last ran (:obj:`None` if it hasn't run yet).
"""
# get time at which to run:
if time_spec is None:
time_spec = job.interval
if time_spec is None:
raise ValueError("no time specification given for scheduling non-repeating job")
next_t = to_float_timestamp(time_spec, reference_timestamp=previous_t)
# enqueue:
self.logger.debug('Putting job %s with t=%s', job.name, time_spec)
self._queue.put((next_t, job))
job._set_next_t(next_t)
# Wake up the loop if this job should be executed next
self._set_next_peek(next_t)
def run_once(self, callback, when, context=None, name=None):
def run_once(self, callback, when, context=None, name=None, job_kwargs=None):
"""Creates a new ``Job`` that runs once and adds it to the queue.
Args:
@ -144,24 +141,34 @@ class JobQueue:
Can be accessed through ``job.context`` in the callback. Defaults to :obj:`None`.
name (:obj:`str`, optional): The name of the new job. Defaults to
``callback.__name__``.
job_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to pass to the
``scheduler.add_job()``.
Returns:
:class:`telegram.ext.Job`: The new ``Job`` instance that has been added to the job
queue.
"""
tzinfo = when.tzinfo if isinstance(when, (datetime.datetime, datetime.time)) else None
if not job_kwargs:
job_kwargs = {}
job = Job(callback,
repeat=False,
context=context,
name=name,
job_queue=self,
tzinfo=tzinfo)
self._put(job, time_spec=when)
name = name or callback.__name__
job = Job(callback, context, name, self)
dt = self._parse_time_input(when, shift_day=True)
j = self.scheduler.add_job(callback,
name=name,
trigger='date',
run_date=dt,
args=self._build_args(job),
timezone=dt.tzinfo or self.scheduler.timezone,
**job_kwargs)
job.job = j
return job
def run_repeating(self, callback, interval, first=None, context=None, name=None):
def run_repeating(self, callback, interval, first=None, last=None, context=None, name=None,
job_kwargs=None):
"""Creates a new ``Job`` that runs at specified intervals and adds it to the queue.
Args:
@ -195,10 +202,21 @@ class JobQueue:
then ``first.tzinfo`` will define ``Job.tzinfo``. Otherwise UTC will be assumed.
Defaults to ``interval``
last (:obj:`int` | :obj:`float` | :obj:`datetime.timedelta` | \
:obj:`datetime.datetime` | :obj:`datetime.time`, optional):
Latest possible time for the job to run. This parameter will be interpreted
depending on its type. See ``first`` for details.
If ``last`` is :obj:`datetime.datetime` or :obj:`datetime.time` type
and ``last.tzinfo`` is :obj:`None`, UTC will be assumed.
Defaults to :obj:`None`.
context (:obj:`object`, optional): Additional data needed for the callback function.
Can be accessed through ``job.context`` in the callback. Defaults to :obj:`None`.
name (:obj:`str`, optional): The name of the new job. Defaults to
``callback.__name__``.
job_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to pass to the
``scheduler.add_job()``.
Returns:
:class:`telegram.ext.Job`: The new ``Job`` instance that has been added to the job
@ -210,19 +228,35 @@ class JobQueue:
to pin servers to UTC time, then time related behaviour can always be expected.
"""
tzinfo = first.tzinfo if isinstance(first, (datetime.datetime, datetime.time)) else None
if not job_kwargs:
job_kwargs = {}
job = Job(callback,
interval=interval,
repeat=True,
context=context,
name=name,
job_queue=self,
tzinfo=tzinfo)
self._put(job, time_spec=first)
name = name or callback.__name__
job = Job(callback, context, name, self)
dt_first = self._parse_time_input(first)
dt_last = self._parse_time_input(last)
if dt_last and dt_first and dt_last < dt_first:
raise ValueError("'last' must not be before 'first'!")
if isinstance(interval, datetime.timedelta):
interval = interval.total_seconds()
j = self.scheduler.add_job(callback,
trigger='interval',
args=self._build_args(job),
start_date=dt_first,
end_date=dt_last,
seconds=interval,
name=name,
**job_kwargs)
job.job = j
return job
def run_monthly(self, callback, when, day, context=None, name=None, day_is_strict=True):
def run_monthly(self, callback, when, day, context=None, name=None, day_is_strict=True,
job_kwargs=None):
"""Creates a new ``Job`` that runs on a monthly basis and adds it to the queue.
Args:
@ -244,92 +278,55 @@ class JobQueue:
``callback.__name__``.
day_is_strict (:obj:`bool`, optional): If :obj:`False` and day > month.days, will pick
the last day in the month. Defaults to :obj:`True`.
job_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to pass to the
``scheduler.add_job()``.
Returns:
:class:`telegram.ext.Job`: The new ``Job`` instance that has been added to the job
queue.
"""
tzinfo = when.tzinfo if isinstance(when, (datetime.datetime, datetime.time)) else None
if 1 <= day <= 31:
next_dt = self._get_next_month_date(day, day_is_strict, when, allow_now=True)
job = Job(callback, repeat=False, context=context, name=name, job_queue=self,
is_monthly=True, day_is_strict=day_is_strict, tzinfo=tzinfo)
self._put(job, time_spec=next_dt)
return job
if not job_kwargs:
job_kwargs = {}
name = name or callback.__name__
job = Job(callback, context, name, self)
if day_is_strict:
j = self.scheduler.add_job(callback,
trigger='cron',
args=self._build_args(job),
name=name,
day=day,
hour=when.hour,
minute=when.minute,
second=when.second,
timezone=when.tzinfo or self.scheduler.timezone,
**job_kwargs)
else:
raise ValueError("The elements of the 'day' argument should be from 1 up to"
" and including 31")
trigger = OrTrigger([CronTrigger(day=day,
hour=when.hour,
minute=when.minute,
second=when.second,
timezone=when.tzinfo,
**job_kwargs),
CronTrigger(day='last',
hour=when.hour,
minute=when.minute,
second=when.second,
timezone=when.tzinfo or self.scheduler.timezone,
**job_kwargs)])
j = self.scheduler.add_job(callback,
trigger=trigger,
args=self._build_args(job),
name=name,
**job_kwargs)
def _get_next_month_date(self, day, day_is_strict, when, allow_now=False):
"""This method returns the date that the next monthly job should be scheduled.
job.job = j
return job
Args:
day (:obj:`int`): The day of the month the job should run.
day_is_strict (:obj:`bool`):
Specification as to whether the specified day of job should be strictly
respected. If day_is_strict is :obj:`True` it ignores months whereby the
specified date does not exist (e.g February 31st). If it set to :obj:`False`,
it returns the last valid date of the month instead. For example,
if the user runs a job on the 31st of every month, and sets
the day_is_strict variable to :obj:`False`, April, for example,
the job would run on April 30th.
when (:obj:`datetime.time`): Time of day at which the job should run. If the
timezone (``time.tzinfo``) is :obj:`None`, UTC will be assumed.
allow_now (:obj:`bool`): Whether executing the job right now is a feasible options.
For stability reasons, this defaults to :obj:`False`, but it needs to be :obj:`True`
on initializing a job.
"""
dt = datetime.datetime.now(tz=when.tzinfo or datetime.timezone.utc)
dt_time = dt.time().replace(tzinfo=when.tzinfo)
days_in_current_month = calendar.monthrange(dt.year, dt.month)[1]
days_till_months_end = days_in_current_month - dt.day
if days_in_current_month < day:
# if the day does not exist in the current month (e.g Feb 31st)
if day_is_strict is False:
# set day as last day of month instead
next_dt = dt + datetime.timedelta(days=days_till_months_end)
else:
# else set as day in subsequent month. Subsequent month is
# guaranteed to have the date, if current month does not have the date.
next_dt = dt + datetime.timedelta(days=days_till_months_end + day)
else:
# if the day exists in the current month
if dt.day < day:
# day is upcoming
next_dt = dt + datetime.timedelta(day - dt.day)
elif dt.day > day or (dt.day == day and ((not allow_now and dt_time >= when)
or (allow_now and dt_time > when))):
# run next month if day has already passed
next_year = dt.year + 1 if dt.month == 12 else dt.year
next_month = 1 if dt.month == 12 else dt.month + 1
days_in_next_month = calendar.monthrange(next_year, next_month)[1]
next_month_has_date = days_in_next_month >= day
if next_month_has_date:
next_dt = dt + datetime.timedelta(days=days_till_months_end + day)
elif day_is_strict:
# schedule the subsequent month if day is strict
next_dt = dt + datetime.timedelta(
days=days_till_months_end + days_in_next_month + day)
else:
# schedule in the next month last date if day is not strict
next_dt = dt + datetime.timedelta(days=days_till_months_end
+ days_in_next_month)
else:
# day is today but time has not yet come
next_dt = dt
# Set the correct time
next_dt = next_dt.replace(hour=when.hour, minute=when.minute, second=when.second,
microsecond=when.microsecond)
# fold is new in Py3.6
if hasattr(next_dt, 'fold'):
next_dt = next_dt.replace(fold=when.fold)
return next_dt
def run_daily(self, callback, time, days=Days.EVERY_DAY, context=None, name=None):
def run_daily(self, callback, time, days=Days.EVERY_DAY, context=None, name=None,
job_kwargs=None):
"""Creates a new ``Job`` that runs on a daily basis and adds it to the queue.
Args:
@ -349,158 +346,112 @@ class JobQueue:
Can be accessed through ``job.context`` in the callback. Defaults to :obj:`None`.
name (:obj:`str`, optional): The name of the new job. Defaults to
``callback.__name__``.
job_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to pass to the
``scheduler.add_job()``.
Returns:
:class:`telegram.ext.Job`: The new ``Job`` instance that has been added to the job
queue.
Note:
Daily is just an alias for "24 Hours". That means that if DST changes during that
interval, the job might not run at the time one would expect. It is always recommended
to pin servers to UTC time, then time related behaviour can always be expected.
For a note about DST, please see the documentation of `APScheduler`_.
.. _`APScheduler`: https://apscheduler.readthedocs.io/en/stable/modules/triggers/cron.html
#daylight-saving-time-behavior
"""
job = Job(callback,
interval=datetime.timedelta(days=1),
repeat=True,
days=days,
tzinfo=time.tzinfo,
context=context,
name=name,
job_queue=self)
self._put(job, time_spec=time)
if not job_kwargs:
job_kwargs = {}
name = name or callback.__name__
job = Job(callback, context, name, self)
j = self.scheduler.add_job(callback,
name=name,
args=self._build_args(job),
trigger='cron',
day_of_week=','.join([str(d) for d in days]),
hour=time.hour,
minute=time.minute,
second=time.second,
timezone=time.tzinfo or self.scheduler.timezone,
**job_kwargs)
job.job = j
return job
def _set_next_peek(self, t):
# """
# Set next peek if not defined or `t` is before next peek.
# In case the next peek was set, also trigger the `self.__tick` event.
# """
with self.__next_peek_lock:
if not self._next_peek or self._next_peek > t:
self._next_peek = t
self.__tick.set()
def run_custom(self, callback, job_kwargs, context=None, name=None):
"""Creates a new customly defined ``Job``.
def tick(self):
"""Run all jobs that are due and re-enqueue them with their interval."""
now = time.time()
Args:
callback (:obj:`callable`): The callback function that should be executed by the new
job. Callback signature for context based API:
self.logger.debug('Ticking jobs with t=%f', now)
``def callback(CallbackContext)``
while True:
try:
t, job = self._queue.get(False)
except Empty:
break
``context.job`` is the :class:`telegram.ext.Job` instance. It can be used to access
its ``job.context`` or change it to a repeating job.
job_kwargs (:obj:`dict`): Arbitrary keyword arguments. Used as arguments for
``scheduler.add_job``.
context (:obj:`object`, optional): Additional data needed for the callback function.
Can be accessed through ``job.context`` in the callback. Defaults to :obj:`None`.
name (:obj:`str`, optional): The name of the new job. Defaults to
``callback.__name__``.
self.logger.debug('Peeked at %s with t=%f', job.name, t)
Returns:
:class:`telegram.ext.Job`: The new ``Job`` instance that has been added to the job
queue.
if t > now:
# We can get here in two conditions:
# 1. At the second or later pass of the while loop, after we've already
# processed the job(s) we were supposed to at this time.
# 2. At the first iteration of the loop only if `self.put()` had triggered
# `self.__tick` because `self._next_peek` wasn't set
self.logger.debug("Next task isn't due yet. Finished!")
self._queue.put((t, job))
self._set_next_peek(t)
break
"""
name = name or callback.__name__
job = Job(callback, context, name, self)
if job.removed:
self.logger.debug('Removing job %s', job.name)
continue
j = self.scheduler.add_job(callback,
args=self._build_args(job),
name=name,
**job_kwargs)
if job.enabled:
try:
current_week_day = datetime.datetime.now(job.tzinfo).date().weekday()
if current_week_day in job.days:
self.logger.debug('Running job %s', job.name)
job.run(self._dispatcher)
self._dispatcher.update_persistence()
except Exception:
self.logger.exception('An uncaught error was raised while executing job %s',
job.name)
else:
self.logger.debug('Skipping disabled job %s', job.name)
if job.repeat and not job.removed:
self._put(job, previous_t=t)
elif job.is_monthly and not job.removed:
dt = datetime.datetime.now(tz=job.tzinfo)
dt_time = dt.time().replace(tzinfo=job.tzinfo)
self._put(job, time_spec=self._get_next_month_date(dt.day, job.day_is_strict,
dt_time))
else:
job._set_next_t(None)
self.logger.debug('Dropping non-repeating or removed job %s', job.name)
job.job = j
return job
def start(self):
"""Starts the job_queue thread."""
self.__start_lock.acquire()
if not self._running:
self._running = True
self.__start_lock.release()
self.__thread = Thread(target=self._main_loop,
name="Bot:{}:job_queue".format(self._dispatcher.bot.id))
self.__thread.start()
self.logger.debug('%s thread started', self.__class__.__name__)
else:
self.__start_lock.release()
def _main_loop(self):
"""
Thread target of thread ``job_queue``. Runs in background and performs ticks on the job
queue.
"""
while self._running:
# self._next_peek may be (re)scheduled during self.tick() or self.put()
with self.__next_peek_lock:
tmout = self._next_peek - time.time() if self._next_peek else None
self._next_peek = None
self.__tick.clear()
self.__tick.wait(tmout)
# If we were woken up by self.stop(), just bail out
if not self._running:
break
self.tick()
self.logger.debug('%s thread stopped', self.__class__.__name__)
if not self.scheduler.running:
self.scheduler.start()
def stop(self):
"""Stops the thread."""
with self.__start_lock:
self._running = False
self.__tick.set()
if self.__thread is not None:
self.__thread.join()
if self.scheduler.running:
self.scheduler.shutdown()
def jobs(self):
"""Returns a tuple of all jobs that are currently in the ``JobQueue``."""
with self._queue.mutex:
return tuple(job[1] for job in self._queue.queue if job)
return tuple(Job.from_aps_job(job, self) for job in self.scheduler.get_jobs())
def get_jobs_by_name(self, name):
"""Returns a tuple of jobs with the given name that are currently in the ``JobQueue``"""
with self._queue.mutex:
return tuple(job[1] for job in self._queue.queue if job and job[1].name == name)
return tuple(job for job in self.jobs() if job.name == name)
class Job:
"""This class encapsulates a Job.
"""This class is a convenience wrapper for the jobs held in a :class:`telegram.ext.JobQueue`.
With the current backend APScheduler, :attr:`job` holds a :class:`apscheduler.job.Job`
instance.
Note:
* All attributes and instance methods of :attr:`job` are also directly available as
attributes/methods of the corresponding :class:`telegram.ext.Job` object.
* Two instances of :class:`telegram.ext.Job` are considered equal, if their corresponding
``job`` attributes have the same ``id``.
* If :attr:`job` isn't passed on initialization, it must be set manually afterwards for
this :class:`telegram.ext.Job` to be useful.
Attributes:
callback (:obj:`callable`): The callback function that should be executed by the new job.
context (:obj:`object`): Optional. Additional data needed for the callback function.
name (:obj:`str`): Optional. The name of the new job.
is_monthly (:obj: `bool`): Optional. Indicates whether it is a monthly job.
day_is_strict (:obj: `bool`): Optional. Indicates whether the monthly jobs day is strict.
job_queue (:class:`telegram.ext.JobQueue`): Optional. The ``JobQueue`` this job belongs to.
job (:class:`apscheduler.job.Job`): Optional. The APS Job this job is a wrapper for.
Args:
callback (:obj:`callable`): The callback function that should be executed by the new job.
@ -510,125 +461,72 @@ class Job:
a ``context.job`` is the :class:`telegram.ext.Job` instance. It can be used to access
its ``job.context`` or change it to a repeating job.
interval (:obj:`int` | :obj:`float` | :obj:`datetime.timedelta`, optional): The time
interval between executions of the job. If it is an :obj:`int` or a :obj:`float`,
it will be interpreted as seconds. If you don't set this value, you must set
:attr:`repeat` to :obj:`False` and specify :attr:`time_spec` when you put the job into
the job queue.
repeat (:obj:`bool`, optional): If this job should be periodically execute its callback
function (:obj:`True`) or only once (:obj:`False`). Defaults to :obj:`True`.
context (:obj:`object`, optional): Additional data needed for the callback function. Can be
accessed through ``job.context`` in the callback. Defaults to :obj:`None`.
name (:obj:`str`, optional): The name of the new job. Defaults to ``callback.__name__``.
days (Tuple[:obj:`int`], optional): Defines on which days of the week the job should run.
Defaults to ``Days.EVERY_DAY``
job_queue (:class:`telegram.ext.JobQueue`, optional): The ``JobQueue`` this job belongs to.
Only optional for backward compatibility with ``JobQueue.put()``.
tzinfo (:obj:`datetime.tzinfo`, optional): timezone associated to this job. Used when
checking the day of the week to determine whether a job should run (only relevant when
``days is not Days.EVERY_DAY``). Defaults to UTC.
is_monthly (:obj:`bool`, optional): If this job is supposed to be a monthly scheduled job.
Defaults to :obj:`False`.
day_is_strict (:obj:`bool`, optional): If :obj:`False` and day > month.days, will pick the
last day in the month. Defaults to :obj:`True`. Only relevant when ``is_monthly`` is
:obj:`True`.
job (:class:`apscheduler.job.Job`, optional): The APS Job this job is a wrapper for.
"""
def __init__(self,
callback,
interval=None,
repeat=True,
context=None,
days=Days.EVERY_DAY,
name=None,
job_queue=None,
tzinfo=None,
is_monthly=False,
day_is_strict=True):
job=None):
self.callback = callback
self.context = context
self.name = name or callback.__name__
self.job_queue = job_queue
self._repeat = None
self._interval = None
self.interval = interval
self._next_t = None
self.repeat = repeat
self.is_monthly = is_monthly
self.day_is_strict = day_is_strict
self._removed = False
self._enabled = False
self._days = None
self.days = days
self.tzinfo = tzinfo or datetime.timezone.utc
self._job_queue = weakref.proxy(job_queue) if job_queue is not None else None
self._remove = Event()
self._enabled = Event()
self._enabled.set()
self.job = job
def run(self, dispatcher):
"""Executes the callback function."""
if dispatcher.use_context:
self.callback(CallbackContext.from_job(self, dispatcher))
else:
self.callback(dispatcher.bot, self)
"""Executes the callback function independently of the jobs schedule."""
try:
if dispatcher.use_context:
self.callback(CallbackContext.from_job(self, dispatcher))
else:
self.callback(dispatcher.bot, self)
except Exception as e:
try:
dispatcher.dispatch_error(None, e)
# Errors should not stop the thread.
except Exception:
dispatcher.logger.exception('An error was raised while processing the job and an '
'uncaught error was raised while handling the error '
'with an error_handler.')
def schedule_removal(self):
"""
Schedules this job for removal from the ``JobQueue``. It will be removed without executing
its callback function again.
"""
self._remove.set()
self._next_t = None
self.job.remove()
self._removed = True
@property
def removed(self):
""":obj:`bool`: Whether this job is due to be removed."""
return self._remove.is_set()
return self._removed
@property
def enabled(self):
""":obj:`bool`: Whether this job is enabled."""
return self._enabled.is_set()
return self._enabled
@enabled.setter
def enabled(self, status):
if status:
self._enabled.set()
self.job.resume()
else:
self._enabled.clear()
@property
def interval(self):
"""
:obj:`int` | :obj:`float` | :obj:`datetime.timedelta`: Optional. The interval in which the
job will run.
"""
return self._interval
@interval.setter
def interval(self, interval):
if interval is None and self.repeat:
raise ValueError("The 'interval' can not be 'None' when 'repeat' is set to 'True'")
if not (interval is None or isinstance(interval, (Number, datetime.timedelta))):
raise TypeError("The 'interval' must be of type 'datetime.timedelta',"
" 'int' or 'float'")
self._interval = interval
@property
def interval_seconds(self):
""":obj:`int`: The interval for this job in seconds."""
interval = self.interval
if isinstance(interval, datetime.timedelta):
return interval.total_seconds()
else:
return interval
self.job.pause()
self._enabled = status
@property
def next_t(self):
@ -636,63 +534,25 @@ class Job:
:obj:`datetime.datetime`: Datetime for the next job execution.
Datetime is localized according to :attr:`tzinfo`.
If job is removed or already ran it equals to :obj:`None`.
"""
return datetime.datetime.fromtimestamp(self._next_t, self.tzinfo) if self._next_t else None
return self.job.next_run_time
def _set_next_t(self, next_t):
if isinstance(next_t, datetime.datetime):
# Set timezone to UTC in case datetime is in local timezone.
next_t = next_t.astimezone(datetime.timezone.utc)
next_t = to_float_timestamp(next_t)
elif not (isinstance(next_t, Number) or next_t is None):
raise TypeError("The 'next_t' argument should be one of the following types: "
"'float', 'int', 'datetime.datetime' or 'NoneType'")
self._next_t = next_t
@property
def repeat(self):
""":obj:`bool`: Optional. If this job should periodically execute its callback function."""
return self._repeat
@repeat.setter
def repeat(self, repeat):
if self.interval is None and repeat:
raise ValueError("'repeat' can not be set to 'True' when no 'interval' is set")
self._repeat = repeat
@property
def days(self):
"""Tuple[:obj:`int`]: Optional. Defines on which days of the week the job should run."""
return self._days
@days.setter
def days(self, days):
if not isinstance(days, tuple):
raise TypeError("The 'days' argument should be of type 'tuple'")
if not all(isinstance(day, int) for day in days):
raise TypeError("The elements of the 'days' argument should be of type 'int'")
if not all(0 <= day <= 6 for day in days):
raise ValueError("The elements of the 'days' argument should be from 0 up to and "
"including 6")
self._days = days
@property
def job_queue(self):
""":class:`telegram.ext.JobQueue`: Optional. The ``JobQueue`` this job belongs to."""
return self._job_queue
@job_queue.setter
def job_queue(self, job_queue):
# Property setter for backward compatibility with JobQueue.put()
if not self._job_queue:
self._job_queue = weakref.proxy(job_queue)
@classmethod
def from_aps_job(cls, job, job_queue):
# context based callbacks
if len(job.args) == 1:
context = job.args[0].job.context
else:
raise RuntimeError("The 'job_queue' attribute can only be set once.")
context = job.args[1].context
return cls(job.func, context=context, name=job.name, job_queue=job_queue, job=job)
def __getattr__(self, item):
return getattr(self.job, item)
def __lt__(self, other):
return False
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.id == other.id
return False

View file

@ -25,6 +25,7 @@ from threading import Thread, Event
from time import sleep
import pytest
import pytz
from telegram import (Bot, Message, User, Chat, MessageEntity, Update,
InlineQuery, CallbackQuery, ShippingQuery, PreCheckoutQuery,
@ -271,14 +272,14 @@ def false_update(request):
return Update(update_id=1, **request.param)
@pytest.fixture(params=[1, 2], ids=lambda h: 'UTC +{hour:0>2}:00'.format(hour=h))
def utc_offset(request):
return datetime.timedelta(hours=request.param)
@pytest.fixture(params=['Europe/Berlin', 'Asia/Singapore', 'UTC'])
def tzinfo(request):
return pytz.timezone(request.param)
@pytest.fixture()
def timezone(utc_offset):
return datetime.timezone(utc_offset)
def timezone(tzinfo):
return tzinfo
def expect_bad_request(func, message, reason):

View file

@ -25,7 +25,7 @@ from telegram import (CallbackQuery, Chat, ChosenInlineResult, InlineQuery, Mess
PreCheckoutQuery, ShippingQuery, Update, User, MessageEntity)
from telegram.ext import (ConversationHandler, CommandHandler, CallbackQueryHandler,
MessageHandler, Filters, InlineQueryHandler, CallbackContext,
DispatcherHandlerStop, TypeHandler)
DispatcherHandlerStop, TypeHandler, JobQueue)
@pytest.fixture(scope='class')
@ -38,6 +38,15 @@ def user2():
return User(first_name='Mister Test', id=124, is_bot=False)
@pytest.fixture(autouse=True)
def start_stop_job_queue(dp):
dp.job_queue = JobQueue()
dp.job_queue.set_dispatcher(dp)
dp.job_queue.start()
yield
dp.job_queue.stop()
def raise_dphs(func):
def decorator(self, *args, **kwargs):
result = func(self, *args, **kwargs)
@ -610,8 +619,7 @@ class TestConversationHandler:
bot=bot)
dp.process_update(Update(update_id=0, message=message))
assert handler.conversations.get((self.group.id, user1.id)) == self.THIRSTY
sleep(0.5)
dp.job_queue.tick()
sleep(0.65)
assert handler.conversations.get((self.group.id, user1.id)) is None
# Start state machine, do something, then reach timeout
@ -619,11 +627,9 @@ class TestConversationHandler:
assert handler.conversations.get((self.group.id, user1.id)) == self.THIRSTY
message.text = '/brew'
message.entities[0].length = len('/brew')
dp.job_queue.tick()
dp.process_update(Update(update_id=2, message=message))
assert handler.conversations.get((self.group.id, user1.id)) == self.BREWING
sleep(0.5)
dp.job_queue.tick()
sleep(0.6)
assert handler.conversations.get((self.group.id, user1.id)) is None
def test_conversation_timeout_dispatcher_handler_stop(self, dp, bot, user1, caplog):
@ -645,8 +651,7 @@ class TestConversationHandler:
with caplog.at_level(logging.WARNING):
dp.process_update(Update(update_id=0, message=message))
assert handler.conversations.get((self.group.id, user1.id)) == self.THIRSTY
sleep(0.5)
dp.job_queue.tick()
sleep(0.8)
assert handler.conversations.get((self.group.id, user1.id)) is None
assert len(caplog.records) == 1
rec = caplog.records[-1]
@ -684,8 +689,7 @@ class TestConversationHandler:
timeout_handler.callback = timeout_callback
cdp.process_update(update)
sleep(0.5)
cdp.job_queue.tick()
sleep(0.6)
assert handler.conversations.get((self.group.id, user1.id)) is None
assert self.is_timeout
@ -708,24 +712,20 @@ class TestConversationHandler:
dp.process_update(Update(update_id=0, message=message))
assert handler.conversations.get((self.group.id, user1.id)) == self.THIRSTY
sleep(0.25) # t=.25
dp.job_queue.tick()
assert handler.conversations.get((self.group.id, user1.id)) == self.THIRSTY
message.text = '/brew'
message.entities[0].length = len('/brew')
dp.process_update(Update(update_id=0, message=message))
assert handler.conversations.get((self.group.id, user1.id)) == self.BREWING
sleep(0.35) # t=.6
dp.job_queue.tick()
assert handler.conversations.get((self.group.id, user1.id)) == self.BREWING
message.text = '/pourCoffee'
message.entities[0].length = len('/pourCoffee')
dp.process_update(Update(update_id=0, message=message))
assert handler.conversations.get((self.group.id, user1.id)) == self.DRINKING
sleep(.4) # t=1
dp.job_queue.tick()
assert handler.conversations.get((self.group.id, user1.id)) == self.DRINKING
sleep(.1) # t=1.1
dp.job_queue.tick()
sleep(.2) # t=1.2
assert handler.conversations.get((self.group.id, user1.id)) is None
def test_conversation_timeout_two_users(self, dp, bot, user1, user2):
@ -744,16 +744,13 @@ class TestConversationHandler:
message.entities[0].length = len('/brew')
message.entities[0].length = len('/brew')
message.from_user = user2
dp.job_queue.tick()
dp.process_update(Update(update_id=0, message=message))
assert handler.conversations.get((self.group.id, user2.id)) is None
message.text = '/start'
message.entities[0].length = len('/start')
dp.job_queue.tick()
dp.process_update(Update(update_id=0, message=message))
assert handler.conversations.get((self.group.id, user2.id)) == self.THIRSTY
sleep(0.5)
dp.job_queue.tick()
sleep(0.6)
assert handler.conversations.get((self.group.id, user1.id)) is None
assert handler.conversations.get((self.group.id, user2.id)) is None
@ -776,8 +773,7 @@ class TestConversationHandler:
message.text = '/brew'
message.entities[0].length = len('/brew')
dp.process_update(Update(update_id=0, message=message))
sleep(0.5)
dp.job_queue.tick()
sleep(0.6)
assert handler.conversations.get((self.group.id, user1.id)) is None
assert self.is_timeout
@ -786,8 +782,7 @@ class TestConversationHandler:
message.text = '/start'
message.entities[0].length = len('/start')
dp.process_update(Update(update_id=1, message=message))
sleep(0.5)
dp.job_queue.tick()
sleep(0.6)
assert handler.conversations.get((self.group.id, user1.id)) is None
assert self.is_timeout
@ -800,8 +795,7 @@ class TestConversationHandler:
message.text = '/startCoding'
message.entities[0].length = len('/startCoding')
dp.process_update(Update(update_id=0, message=message))
sleep(0.5)
dp.job_queue.tick()
sleep(0.6)
assert handler.conversations.get((self.group.id, user1.id)) is None
assert not self.is_timeout
@ -824,8 +818,7 @@ class TestConversationHandler:
message.text = '/brew'
message.entities[0].length = len('/brew')
cdp.process_update(Update(update_id=0, message=message))
sleep(0.5)
cdp.job_queue.tick()
sleep(0.6)
assert handler.conversations.get((self.group.id, user1.id)) is None
assert self.is_timeout
@ -834,8 +827,7 @@ class TestConversationHandler:
message.text = '/start'
message.entities[0].length = len('/start')
cdp.process_update(Update(update_id=1, message=message))
sleep(0.5)
cdp.job_queue.tick()
sleep(0.6)
assert handler.conversations.get((self.group.id, user1.id)) is None
assert self.is_timeout
@ -848,8 +840,7 @@ class TestConversationHandler:
message.text = '/startCoding'
message.entities[0].length = len('/startCoding')
cdp.process_update(Update(update_id=0, message=message))
sleep(0.5)
cdp.job_queue.tick()
sleep(0.6)
assert handler.conversations.get((self.group.id, user1.id)) is None
assert not self.is_timeout
@ -865,7 +856,6 @@ class TestConversationHandler:
def slowbrew(_bot, update):
sleep(0.25)
# Let's give to the original timeout a chance to execute
dp.job_queue.tick()
sleep(0.25)
# By returning None we do not override the conversation state so
# we can see if the timeout has been executed
@ -887,16 +877,13 @@ class TestConversationHandler:
bot=bot)
dp.process_update(Update(update_id=0, message=message))
sleep(0.25)
dp.job_queue.tick()
message.text = '/slowbrew'
message.entities[0].length = len('/slowbrew')
dp.process_update(Update(update_id=0, message=message))
dp.job_queue.tick()
assert handler.conversations.get((self.group.id, user1.id)) is not None
assert not self.is_timeout
sleep(0.5)
dp.job_queue.tick()
sleep(0.6)
assert handler.conversations.get((self.group.id, user1.id)) is None
assert self.is_timeout

View file

@ -86,9 +86,10 @@ class TestHelpers:
"""Conversion from timezone-aware datetime to timestamp"""
# we're parametrizing this with two different UTC offsets to exclude the possibility
# of an xpass when the test is run in a timezone with the same UTC offset
datetime = dtm.datetime(2019, 11, 11, 0, 26, 16, 10**5, tzinfo=timezone)
test_datetime = dtm.datetime(2019, 11, 11, 0, 26, 16, 10**5)
datetime = timezone.localize(test_datetime)
assert (helpers.to_float_timestamp(datetime)
== 1573431976.1 - timezone.utcoffset(None).total_seconds())
== 1573431976.1 - timezone.utcoffset(test_datetime).total_seconds())
def test_to_float_timestamp_absolute_no_reference(self):
"""A reference timestamp is only relevant for relative time specifications"""
@ -116,14 +117,15 @@ class TestHelpers:
"""Conversion from timezone-aware time-of-day specification to timestamp"""
# we're parametrizing this with two different UTC offsets to exclude the possibility
# of an xpass when the test is run in a timezone with the same UTC offset
utc_offset = timezone.utcoffset(None)
ref_datetime = dtm.datetime(1970, 1, 1, 12)
utc_offset = timezone.utcoffset(ref_datetime)
ref_t, time_of_day = _datetime_to_float_timestamp(ref_datetime), ref_datetime.time()
aware_time_of_day = timezone.localize(ref_datetime).timetz()
# first test that naive time is assumed to be utc:
assert helpers.to_float_timestamp(time_of_day, ref_t) == pytest.approx(ref_t)
# test that by setting the timezone the timestamp changes accordingly:
assert (helpers.to_float_timestamp(time_of_day.replace(tzinfo=timezone), ref_t)
assert (helpers.to_float_timestamp(aware_time_of_day, ref_t)
== pytest.approx(ref_t + (-utc_offset.total_seconds() % (24 * 60 * 60))))
@pytest.mark.parametrize('time_spec', RELATIVE_TIME_SPECS, ids=str)
@ -149,9 +151,10 @@ class TestHelpers:
def test_from_timestamp_aware(self, timezone):
# we're parametrizing this with two different UTC offsets to exclude the possibility
# of an xpass when the test is run in a timezone with the same UTC offset
datetime = dtm.datetime(2019, 11, 11, 0, 26, 16, 10**5, tzinfo=timezone)
assert (helpers.from_timestamp(1573431976.1 - timezone.utcoffset(None).total_seconds())
== datetime)
test_datetime = dtm.datetime(2019, 11, 11, 0, 26, 16, 10 ** 5)
datetime = timezone.localize(test_datetime)
assert (helpers.from_timestamp(
1573431976.1 - timezone.utcoffset(test_datetime).total_seconds()) == datetime)
def test_create_deep_linked_url(self):
username = 'JamesTheMock'

View file

@ -51,8 +51,7 @@ class TestInputFile:
def test_mimetypes(self):
# Only test a few to make sure logic works okay
assert InputFile(open('tests/data/telegram.jpg', 'rb')).mimetype == 'image/jpeg'
if sys.version_info >= (3, 5):
assert InputFile(open('tests/data/telegram.webp', 'rb')).mimetype == 'image/webp'
assert InputFile(open('tests/data/telegram.webp', 'rb')).mimetype == 'image/webp'
assert InputFile(open('tests/data/telegram.mp3', 'rb')).mimetype == 'audio/mpeg'
# Test guess from file

View file

@ -18,15 +18,17 @@
# along with this program. If not, see [http://www.gnu.org/licenses/].
import calendar
import datetime as dtm
import logging
import os
import time
from queue import Queue
from time import sleep
import pytest
import pytz
from apscheduler.schedulers import SchedulerNotRunningError
from flaky import flaky
from telegram.ext import JobQueue, Updater, Job, CallbackContext
from telegram.utils.deprecate import TelegramDeprecationWarning
@pytest.fixture(scope='function')
@ -44,16 +46,18 @@ def job_queue(bot, _dp):
class TestJobQueue:
result = 0
job_time = 0
received_error = None
@pytest.fixture(autouse=True)
def reset(self):
self.result = 0
self.job_time = 0
self.received_error = None
def job_run_once(self, bot, job):
self.result += 1
def job_with_exception(self, bot, job):
def job_with_exception(self, bot, job=None):
raise Exception('Test Error')
def job_remove_self(self, bot, job):
@ -74,32 +78,32 @@ class TestJobQueue:
and context.chat_data is None
and context.user_data is None
and isinstance(context.bot_data, dict)
and context.job_queue is context.job.job_queue):
and context.job_queue is not context.job.job_queue):
self.result += 1
def error_handler(self, bot, update, error):
self.received_error = str(error)
def error_handler_context(self, update, context):
self.received_error = str(context.error)
def error_handler_raise_error(self, *args):
raise Exception('Failing bigly')
def test_run_once(self, job_queue):
job_queue.run_once(self.job_run_once, 0.01)
sleep(0.02)
assert self.result == 1
def test_run_once_timezone(self, job_queue, timezone):
"""Test the correct handling of aware datetimes.
Set the target datetime to utcnow + x hours (naive) with the timezone set to utc + x hours,
which is equivalent to now.
"""
"""Test the correct handling of aware datetimes"""
# we're parametrizing this with two different UTC offsets to exclude the possibility
# of an xpass when the test is run in a timezone with the same UTC offset
when = (dtm.datetime.utcnow() + timezone.utcoffset(None)).replace(tzinfo=timezone)
when = dtm.datetime.now(timezone)
job_queue.run_once(self.job_run_once, when)
sleep(0.001)
assert self.result == 1
def test_run_once_no_time_spec(self, job_queue):
# test that an appropiate exception is raised if a job is attempted to be scheduled
# without specifying a time
with pytest.raises(ValueError):
job_queue.run_once(self.job_run_once, when=None)
def test_job_with_context(self, job_queue):
job_queue.run_once(self.job_run_once_with_context, 0.01, context=5)
sleep(0.02)
@ -117,18 +121,43 @@ class TestJobQueue:
sleep(0.07)
assert self.result == 1
def test_run_repeating_first_immediate(self, job_queue):
job_queue.run_repeating(self.job_run_once, 0.1, first=0)
sleep(0.05)
assert self.result == 1
def test_run_repeating_first_timezone(self, job_queue, timezone):
"""Test correct scheduling of job when passing a timezone-aware datetime as ``first``"""
first = (dtm.datetime.utcnow() + timezone.utcoffset(None)).replace(tzinfo=timezone)
job_queue.run_repeating(self.job_run_once, 0.05, first=first)
sleep(0.001)
job_queue.run_repeating(self.job_run_once, 0.1,
first=dtm.datetime.now(timezone) + dtm.timedelta(seconds=0.05))
sleep(0.1)
assert self.result == 1
def test_run_repeating_last(self, job_queue):
job_queue.run_repeating(self.job_run_once, 0.05, last=0.06)
sleep(0.1)
assert self.result == 1
sleep(0.1)
assert self.result == 1
def test_run_repeating_last_timezone(self, job_queue, timezone):
"""Test correct scheduling of job when passing a timezone-aware datetime as ``first``"""
job_queue.run_repeating(self.job_run_once, 0.05,
last=dtm.datetime.now(timezone) + dtm.timedelta(seconds=0.06))
sleep(0.1)
assert self.result == 1
sleep(0.1)
assert self.result == 1
def test_run_repeating_last_before_first(self, job_queue):
with pytest.raises(ValueError, match="'last' must not be before 'first'!"):
job_queue.run_repeating(self.job_run_once, 0.05, first=1, last=0.5)
def test_run_repeating_timedelta(self, job_queue):
job_queue.run_repeating(self.job_run_once, dtm.timedelta(minutes=3.3333e-4))
sleep(0.05)
assert self.result == 2
def test_run_custom(self, job_queue):
job_queue.run_custom(self.job_run_once, {'trigger': 'interval', 'seconds': 0.02})
sleep(0.05)
assert self.result == 2
def test_multiple(self, job_queue):
job_queue.run_once(self.job_run_once, 0.01)
job_queue.run_once(self.job_run_once, 0.02)
@ -198,7 +227,10 @@ class TestJobQueue:
sleep(1)
assert self.result == 1
finally:
u.stop()
try:
u.stop()
except SchedulerNotRunningError:
pass
def test_time_unit_int(self, job_queue):
# Testing seconds in int
@ -221,9 +253,9 @@ class TestJobQueue:
def test_time_unit_dt_datetime(self, job_queue):
# Testing running at a specific datetime
delta, now = dtm.timedelta(seconds=0.05), time.time()
when = dtm.datetime.utcfromtimestamp(now) + delta
expected_time = now + delta.total_seconds()
delta, now = dtm.timedelta(seconds=0.05), dtm.datetime.now(pytz.utc)
when = now + delta
expected_time = (now + delta).timestamp()
job_queue.run_once(self.job_datetime_tests, when)
sleep(0.06)
@ -231,9 +263,10 @@ class TestJobQueue:
def test_time_unit_dt_time_today(self, job_queue):
# Testing running at a specific time today
delta, now = 0.05, time.time()
when = (dtm.datetime.utcfromtimestamp(now) + dtm.timedelta(seconds=delta)).time()
expected_time = now + delta
delta, now = 0.05, dtm.datetime.now(pytz.utc)
expected_time = now + dtm.timedelta(seconds=delta)
when = expected_time.time()
expected_time = expected_time.timestamp()
job_queue.run_once(self.job_datetime_tests, when)
sleep(0.06)
@ -242,262 +275,197 @@ class TestJobQueue:
def test_time_unit_dt_time_tomorrow(self, job_queue):
# Testing running at a specific time that has passed today. Since we can't wait a day, we
# test if the job's next scheduled execution time has been calculated correctly
delta, now = -2, time.time()
when = (dtm.datetime.utcfromtimestamp(now) + dtm.timedelta(seconds=delta)).time()
expected_time = now + delta + 60 * 60 * 24
delta, now = -2, dtm.datetime.now(pytz.utc)
when = (now + dtm.timedelta(seconds=delta)).time()
expected_time = (now + dtm.timedelta(seconds=delta, days=1)).timestamp()
job_queue.run_once(self.job_datetime_tests, when)
assert job_queue._queue.get(False)[0] == pytest.approx(expected_time)
scheduled_time = job_queue.jobs()[0].next_t.timestamp()
assert scheduled_time == pytest.approx(expected_time)
def test_run_daily(self, job_queue):
delta, now = 0.1, time.time()
time_of_day = (dtm.datetime.utcfromtimestamp(now) + dtm.timedelta(seconds=delta)).time()
expected_reschedule_time = now + delta + 24 * 60 * 60
delta, now = 1, dtm.datetime.now(pytz.utc)
time_of_day = (now + dtm.timedelta(seconds=delta)).time()
expected_reschedule_time = (now + dtm.timedelta(seconds=delta, days=1)).timestamp()
job_queue.run_daily(self.job_run_once, time_of_day)
sleep(0.2)
assert self.result == 1
assert job_queue._queue.get(False)[0] == pytest.approx(expected_reschedule_time)
def test_run_daily_with_timezone(self, job_queue):
"""test that the weekday is retrieved based on the job's timezone
We set a job to run at the current UTC time of day (plus a small delay buffer) with a
timezone that is---approximately (see below)---UTC +24, and set it to run on the weekday
after the current UTC weekday. The job should therefore be executed now (because in UTC+24,
the time of day is the same as the current weekday is the one after the current UTC
weekday).
"""
now = time.time()
utcnow = dtm.datetime.utcfromtimestamp(now)
delta = 0.1
# must subtract one minute because the UTC offset has to be strictly less than 24h
# thus this test will xpass if run in the interval [00:00, 00:01) UTC time
# (because target time will be 23:59 UTC, so local and target weekday will be the same)
target_tzinfo = dtm.timezone(dtm.timedelta(days=1, minutes=-1))
target_datetime = (utcnow + dtm.timedelta(days=1, minutes=-1, seconds=delta)).replace(
tzinfo=target_tzinfo)
target_time = target_datetime.timetz()
target_weekday = target_datetime.date().weekday()
expected_reschedule_time = now + delta + 24 * 60 * 60
job_queue.run_daily(self.job_run_once, time=target_time, days=(target_weekday,))
sleep(delta + 0.1)
assert self.result == 1
assert job_queue._queue.get(False)[0] == pytest.approx(expected_reschedule_time)
scheduled_time = job_queue.jobs()[0].next_t.timestamp()
assert scheduled_time == pytest.approx(expected_reschedule_time)
def test_run_monthly(self, job_queue):
delta, now = 0.1, time.time()
date_time = dtm.datetime.utcfromtimestamp(now)
time_of_day = (date_time + dtm.timedelta(seconds=delta)).time()
expected_reschedule_time = now + delta
def test_run_monthly(self, job_queue, timezone):
delta, now = 1, dtm.datetime.now(timezone)
expected_reschedule_time = now + dtm.timedelta(seconds=delta)
time_of_day = expected_reschedule_time.time().replace(tzinfo=timezone)
day = date_time.day
expected_reschedule_time += calendar.monthrange(date_time.year,
date_time.month)[1] * 24 * 60 * 60
day = now.day
expected_reschedule_time += dtm.timedelta(calendar.monthrange(now.year, now.month)[1])
expected_reschedule_time = expected_reschedule_time.timestamp()
job_queue.run_monthly(self.job_run_once, time_of_day, day)
sleep(0.2)
assert self.result == 1
assert job_queue._queue.get(False)[0] == pytest.approx(expected_reschedule_time)
def test_run_monthly_and_not_strict(self, job_queue):
# This only really tests something in months with < 31 days.
# But the trouble of patching datetime is probably not worth it
delta, now = 0.1, time.time()
date_time = dtm.datetime.utcfromtimestamp(now)
time_of_day = (date_time + dtm.timedelta(seconds=delta)).time()
expected_reschedule_time = now + delta
day = date_time.day
date_time += dtm.timedelta(calendar.monthrange(date_time.year,
date_time.month)[1] - day)
# next job should be scheduled on last day of month if day_is_strict is False
expected_reschedule_time += (calendar.monthrange(date_time.year,
date_time.month)[1] - day) * 24 * 60 * 60
job_queue.run_monthly(self.job_run_once, time_of_day, 31, day_is_strict=False)
assert job_queue._queue.get(False)[0] == pytest.approx(expected_reschedule_time)
def test_run_monthly_with_timezone(self, job_queue):
"""test that the day is retrieved based on the job's timezone
We set a job to run at the current UTC time of day (plus a small delay buffer) with a
timezone that is---approximately (see below)---UTC +24, and set it to run on the weekday
after the current UTC weekday. The job should therefore be executed now (because in UTC+24,
the time of day is the same as the current weekday is the one after the current UTC
weekday).
"""
now = time.time()
utcnow = dtm.datetime.utcfromtimestamp(now)
delta = 0.1
# must subtract one minute because the UTC offset has to be strictly less than 24h
# thus this test will xpass if run in the interval [00:00, 00:01) UTC time
# (because target time will be 23:59 UTC, so local and target weekday will be the same)
target_tzinfo = dtm.timezone(dtm.timedelta(days=1, minutes=-1))
target_datetime = (utcnow + dtm.timedelta(days=1, minutes=-1, seconds=delta)).replace(
tzinfo=target_tzinfo)
target_time = target_datetime.timetz()
target_day = target_datetime.day
expected_reschedule_time = now + delta
expected_reschedule_time += calendar.monthrange(target_datetime.year,
target_datetime.month)[1] * 24 * 60 * 60
job_queue.run_monthly(self.job_run_once, target_time, target_day)
sleep(delta + 0.1)
assert self.result == 1
assert job_queue._queue.get(False)[0] == pytest.approx(expected_reschedule_time)
scheduled_time = job_queue.jobs()[0].next_t.timestamp()
assert scheduled_time == pytest.approx(expected_reschedule_time)
def test_warnings(self, job_queue):
j = Job(self.job_run_once, repeat=False)
with pytest.raises(ValueError, match='can not be set to'):
j.repeat = True
j.interval = 15
assert j.interval_seconds == 15
j.repeat = True
with pytest.raises(ValueError, match='can not be'):
j.interval = None
j.repeat = False
with pytest.raises(TypeError, match='must be of type'):
j.interval = 'every 3 minutes'
j.interval = 15
assert j.interval_seconds == 15
def test_run_monthly_non_strict_day(self, job_queue, timezone):
delta, now = 1, dtm.datetime.now(timezone)
expected_reschedule_time = now + dtm.timedelta(seconds=delta)
time_of_day = expected_reschedule_time.time().replace(tzinfo=timezone)
with pytest.raises(TypeError, match='argument should be of type'):
j.days = 'every day'
with pytest.raises(TypeError, match='The elements of the'):
j.days = ('mon', 'wed')
with pytest.raises(ValueError, match='from 0 up to and'):
j.days = (0, 6, 12, 14)
expected_reschedule_time += (dtm.timedelta(calendar.monthrange(now.year, now.month)[1])
- dtm.timedelta(days=now.day))
# Adjust the hour for the special case that between now & end of month a DST switch happens
expected_reschedule_time = timezone.normalize(expected_reschedule_time)
expected_reschedule_time += dtm.timedelta(
hours=time_of_day.hour - expected_reschedule_time.hour)
expected_reschedule_time = expected_reschedule_time.timestamp()
with pytest.raises(TypeError, match='argument should be one of the'):
j._set_next_t('tomorrow')
job_queue.run_monthly(self.job_run_once, time_of_day, 31, day_is_strict=False)
scheduled_time = job_queue.jobs()[0].next_t.timestamp()
assert scheduled_time == pytest.approx(expected_reschedule_time)
def test_get_jobs(self, job_queue):
job1 = job_queue.run_once(self.job_run_once, 10, name='name1')
job2 = job_queue.run_once(self.job_run_once, 10, name='name1')
job3 = job_queue.run_once(self.job_run_once, 10, name='name2')
@pytest.mark.parametrize('use_context', [True, False])
def test_get_jobs(self, job_queue, use_context):
job_queue._dispatcher.use_context = use_context
if use_context:
callback = self.job_context_based_callback
else:
callback = self.job_run_once
job1 = job_queue.run_once(callback, 10, name='name1')
job2 = job_queue.run_once(callback, 10, name='name1')
job3 = job_queue.run_once(callback, 10, name='name2')
assert job_queue.jobs() == (job1, job2, job3)
assert job_queue.get_jobs_by_name('name1') == (job1, job2)
assert job_queue.get_jobs_by_name('name2') == (job3,)
def test_bot_in_init_deprecation(self, bot):
with pytest.warns(TelegramDeprecationWarning):
JobQueue(bot)
def test_context_based_callback(self, job_queue):
job_queue.run_once(self.job_context_based_callback, 0.01, context=2)
job_queue._dispatcher.use_context = True
job_queue.run_once(self.job_context_based_callback, 0.01, context=2)
sleep(0.03)
assert self.result == 0
def test_job_default_tzinfo(self, job_queue):
"""Test that default tzinfo is always set to UTC"""
job_1 = job_queue.run_once(self.job_run_once, 0.01)
job_2 = job_queue.run_repeating(self.job_run_once, 10)
job_3 = job_queue.run_daily(self.job_run_once, time=dtm.time(hour=15))
jobs = [job_1, job_2, job_3]
for job in jobs:
assert job.tzinfo == dtm.timezone.utc
def test_job_next_t_property(self, job_queue):
# Testing:
# - next_t values match values from self._queue.queue (for run_once and run_repeating jobs)
# - next_t equals None if job is removed or if it's already ran
job1 = job_queue.run_once(self.job_run_once, 0.06, name='run_once job')
job2 = job_queue.run_once(self.job_run_once, 0.06, name='canceled run_once job')
job_queue.run_repeating(self.job_run_once, 0.04, name='repeatable job')
sleep(0.05)
job2.schedule_removal()
with job_queue._queue.mutex:
for t, job in job_queue._queue.queue:
t = dtm.datetime.fromtimestamp(t, job.tzinfo)
if job.removed:
assert job.next_t is None
else:
assert job.next_t == t
assert self.result == 1
sleep(0.02)
job_queue._dispatcher.use_context = False
@pytest.mark.parametrize('use_context', [True, False])
def test_job_run(self, _dp, use_context):
_dp.use_context = use_context
job_queue = JobQueue()
job_queue.set_dispatcher(_dp)
if use_context:
job = job_queue.run_repeating(self.job_context_based_callback, 0.02, context=2)
else:
job = job_queue.run_repeating(self.job_run_once, 0.02, context=2)
assert self.result == 0
job.run(_dp)
assert self.result == 1
def test_enable_disable_job(self, job_queue):
job = job_queue.run_repeating(self.job_run_once, 0.02)
sleep(0.05)
assert self.result == 2
assert job1.next_t is None
assert job2.next_t is None
job.enabled = False
assert not job.enabled
sleep(0.05)
assert self.result == 2
job.enabled = True
assert job.enabled
sleep(0.05)
assert self.result == 4
def test_job_set_next_t(self, job_queue):
# Testing next_t setter for 'datetime.datetime' values
def test_remove_job(self, job_queue):
job = job_queue.run_repeating(self.job_run_once, 0.02)
sleep(0.05)
assert self.result == 2
assert not job.removed
job.schedule_removal()
assert job.removed
sleep(0.05)
assert self.result == 2
job = job_queue.run_once(self.job_run_once, 0.05)
def test_job_lt_eq(self, job_queue):
job = job_queue.run_repeating(self.job_run_once, 0.02)
assert not job == job_queue
assert not job < job
t = dtm.datetime.now(tz=dtm.timezone(dtm.timedelta(hours=12)))
job._set_next_t(t)
job.tzinfo = dtm.timezone(dtm.timedelta(hours=5))
assert job.next_t == t.astimezone(job.tzinfo)
def test_dispatch_error(self, job_queue, dp):
dp.add_error_handler(self.error_handler)
def test_passing_tzinfo_to_job(self, job_queue):
"""Test that tzinfo is correctly passed to job with run_once, run_daily, run_repeating
and run_monthly methods"""
job = job_queue.run_once(self.job_with_exception, 0.05)
sleep(.1)
assert self.received_error == 'Test Error'
self.received_error = None
job.run(dp)
assert self.received_error == 'Test Error'
when_dt_tz_specific = dtm.datetime.now(
tz=dtm.timezone(dtm.timedelta(hours=12))
) + dtm.timedelta(seconds=2)
when_dt_tz_utc = dtm.datetime.now() + dtm.timedelta(seconds=2)
job_once1 = job_queue.run_once(self.job_run_once, when_dt_tz_specific)
job_once2 = job_queue.run_once(self.job_run_once, when_dt_tz_utc)
# Remove handler
dp.remove_error_handler(self.error_handler)
self.received_error = None
when_time_tz_specific = (dtm.datetime.now(
tz=dtm.timezone(dtm.timedelta(hours=12))
) + dtm.timedelta(seconds=2)).timetz()
when_time_tz_utc = (dtm.datetime.now() + dtm.timedelta(seconds=2)).timetz()
job_once3 = job_queue.run_once(self.job_run_once, when_time_tz_specific)
job_once4 = job_queue.run_once(self.job_run_once, when_time_tz_utc)
job = job_queue.run_once(self.job_with_exception, 0.05)
sleep(.1)
assert self.received_error is None
job.run(dp)
assert self.received_error is None
first_dt_tz_specific = dtm.datetime.now(
tz=dtm.timezone(dtm.timedelta(hours=12))
) + dtm.timedelta(seconds=2)
first_dt_tz_utc = dtm.datetime.now() + dtm.timedelta(seconds=2)
job_repeating1 = job_queue.run_repeating(
self.job_run_once, 2, first=first_dt_tz_specific)
job_repeating2 = job_queue.run_repeating(
self.job_run_once, 2, first=first_dt_tz_utc)
def test_dispatch_error_context(self, job_queue, cdp):
cdp.add_error_handler(self.error_handler_context)
first_time_tz_specific = (dtm.datetime.now(
tz=dtm.timezone(dtm.timedelta(hours=12))
) + dtm.timedelta(seconds=2)).timetz()
first_time_tz_utc = (dtm.datetime.now() + dtm.timedelta(seconds=2)).timetz()
job_repeating3 = job_queue.run_repeating(
self.job_run_once, 2, first=first_time_tz_specific)
job_repeating4 = job_queue.run_repeating(
self.job_run_once, 2, first=first_time_tz_utc)
job = job_queue.run_once(self.job_with_exception, 0.05)
sleep(.1)
assert self.received_error == 'Test Error'
self.received_error = None
job.run(cdp)
assert self.received_error == 'Test Error'
time_tz_specific = (dtm.datetime.now(
tz=dtm.timezone(dtm.timedelta(hours=12))
) + dtm.timedelta(seconds=2)).timetz()
time_tz_utc = (dtm.datetime.now() + dtm.timedelta(seconds=2)).timetz()
job_daily1 = job_queue.run_daily(self.job_run_once, time_tz_specific)
job_daily2 = job_queue.run_daily(self.job_run_once, time_tz_utc)
# Remove handler
cdp.remove_error_handler(self.error_handler_context)
self.received_error = None
job_monthly1 = job_queue.run_monthly(self.job_run_once, time_tz_specific, 1)
job_monthly2 = job_queue.run_monthly(self.job_run_once, time_tz_utc, 1)
job = job_queue.run_once(self.job_with_exception, 0.05)
sleep(.1)
assert self.received_error is None
job.run(cdp)
assert self.received_error is None
assert job_once1.tzinfo == when_dt_tz_specific.tzinfo
assert job_once2.tzinfo == dtm.timezone.utc
assert job_once3.tzinfo == when_time_tz_specific.tzinfo
assert job_once4.tzinfo == dtm.timezone.utc
assert job_repeating1.tzinfo == first_dt_tz_specific.tzinfo
assert job_repeating2.tzinfo == dtm.timezone.utc
assert job_repeating3.tzinfo == first_time_tz_specific.tzinfo
assert job_repeating4.tzinfo == dtm.timezone.utc
assert job_daily1.tzinfo == time_tz_specific.tzinfo
assert job_daily2.tzinfo == dtm.timezone.utc
assert job_monthly1.tzinfo == time_tz_specific.tzinfo
assert job_monthly2.tzinfo == dtm.timezone.utc
def test_dispatch_error_that_raises_errors(self, job_queue, dp, caplog):
dp.add_error_handler(self.error_handler_raise_error)
with caplog.at_level(logging.ERROR):
job = job_queue.run_once(self.job_with_exception, 0.05)
sleep(.1)
assert len(caplog.records) == 1
rec = caplog.records[-1]
assert 'processing the job' in rec.msg
assert 'uncaught error was raised while handling' in rec.msg
caplog.clear()
with caplog.at_level(logging.ERROR):
job.run(dp)
assert len(caplog.records) == 1
rec = caplog.records[-1]
assert 'processing the job' in rec.msg
assert 'uncaught error was raised while handling' in rec.msg
caplog.clear()
# Remove handler
dp.remove_error_handler(self.error_handler_raise_error)
self.received_error = None
with caplog.at_level(logging.ERROR):
job = job_queue.run_once(self.job_with_exception, 0.05)
sleep(.1)
assert len(caplog.records) == 1
rec = caplog.records[-1]
assert 'No error handlers are registered' in rec.msg
caplog.clear()
with caplog.at_level(logging.ERROR):
job.run(dp)
assert len(caplog.records) == 1
rec = caplog.records[-1]
assert 'No error handlers are registered' in rec.msg

View file

@ -17,7 +17,6 @@
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
import signal
import sys
from telegram.utils.helpers import encode_conversations_to_json
@ -1169,7 +1168,6 @@ class TestDictPersistence:
assert dict_persistence.bot_data == bot_data
assert dict_persistence.conversations == conversations
@pytest.mark.skipif(sys.version_info < (3, 6), reason="dicts are not ordered in py<=3.5")
def test_json_outputs(self, user_data_json, chat_data_json, bot_data_json, conversations_json):
dict_persistence = DictPersistence(user_data_json=user_data_json,
chat_data_json=chat_data_json,
@ -1180,7 +1178,6 @@ class TestDictPersistence:
assert dict_persistence.bot_data_json == bot_data_json
assert dict_persistence.conversations_json == conversations_json
@pytest.mark.skipif(sys.version_info < (3, 6), reason="dicts are not ordered in py<=3.5")
def test_json_changes(self, user_data, user_data_json, chat_data, chat_data_json,
bot_data, bot_data_json,
conversations, conversations_json):

View file

@ -513,10 +513,14 @@ class TestUpdater:
with caplog.at_level(logging.INFO):
updater.idle()
rec = caplog.records[-1]
rec = caplog.records[-2]
assert rec.msg.startswith('Received signal {}'.format(signal.SIGTERM))
assert rec.levelname == 'INFO'
rec = caplog.records[-1]
assert rec.msg.startswith('Scheduler has been shut down')
assert rec.levelname == 'INFO'
# If we get this far, idle() ran through
sleep(.5)
assert updater.running is False