diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 40696417e..961c50eb3 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,7 +15,7 @@ jobs: runs-on: ${{matrix.os}} strategy: matrix: - python-version: [3.5, 3.6, 3.7, 3.8] + python-version: [3.6, 3.7, 3.8] os: [ubuntu-latest, windows-latest] include: - os: ubuntu-latest diff --git a/README.rst b/README.rst index bb7215ebb..f5589ce6e 100644 --- a/README.rst +++ b/README.rst @@ -83,7 +83,7 @@ Introduction This library provides a pure Python interface for the `Telegram Bot API `_. -It's compatible with Python versions 3.5+. PTB might also work on `PyPy `_, though there have been a lot of issues before. Hence, PyPy is not officially supported. +It's compatible with Python versions 3.6+. PTB might also work on `PyPy `_, though there have been a lot of issues before. Hence, PyPy is not officially supported. In addition to the pure API implementation, this library features a number of high-level classes to make the development of bots easy and straightforward. These classes are contained in the diff --git a/requirements.txt b/requirements.txt index ac9fb7cc1..8950b52f1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ certifi tornado>=5.1 cryptography decorator>=4.4.0 +APScheduler==3.6.3 diff --git a/setup.py b/setup.py index 97c6045ac..2f5243123 100644 --- a/setup.py +++ b/setup.py @@ -61,7 +61,6 @@ with codecs.open('README.rst', 'r', 'utf-8') as fd: 'Topic :: Internet', 'Programming Language :: Python', 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.8', diff --git a/telegram/ext/jobqueue.py b/telegram/ext/jobqueue.py index b72193ea8..4093b557e 100644 --- a/telegram/ext/jobqueue.py +++ b/telegram/ext/jobqueue.py @@ -18,19 +18,16 @@ # along with this program. If not, see [http://www.gnu.org/licenses/]. """This module contains the classes JobQueue and Job.""" -import calendar import datetime import logging -import time -import warnings -import weakref -from numbers import Number -from queue import PriorityQueue, Empty -from threading import Thread, Lock, Event +import pytz + +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.combining import OrTrigger +from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR from telegram.ext.callbackcontext import CallbackContext -from telegram.utils.deprecate import TelegramDeprecationWarning -from telegram.utils.helpers import to_float_timestamp class Days: @@ -39,36 +36,66 @@ class Days: class JobQueue: - """This class allows you to periodically perform tasks with the bot. + """This class allows you to periodically perform tasks with the bot. It is a convenience + wrapper for the APScheduler library. Attributes: - _queue (:obj:`PriorityQueue`): The queue that holds the Jobs. + scheduler (:class:`apscheduler.schedulers.background.BackgroundScheduler`): The APScheduler bot (:class:`telegram.Bot`): The bot instance that should be passed to the jobs. DEPRECATED: Use :attr:`set_dispatcher` instead. """ - def __init__(self, bot=None): - self._queue = PriorityQueue() - if bot: - warnings.warn("Passing bot to jobqueue is deprecated. Please use set_dispatcher " - "instead!", TelegramDeprecationWarning, stacklevel=2) - - class MockDispatcher: - def __init__(self): - self.bot = bot - self.use_context = False - - self._dispatcher = MockDispatcher() - else: - self._dispatcher = None + def __init__(self): + self._dispatcher = None self.logger = logging.getLogger(self.__class__.__name__) - self.__start_lock = Lock() - self.__next_peek_lock = Lock() # to protect self._next_peek & self.__tick - self.__tick = Event() - self.__thread = None - self._next_peek = None - self._running = False + self.scheduler = BackgroundScheduler(timezone=pytz.utc) + self.scheduler.add_listener(self._update_persistence, + mask=EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) + + # Dispatch errors and don't log them in the APS logger + def aps_log_filter(record): + return 'raised an exception' not in record.msg + + logging.getLogger('apscheduler.executors.default').addFilter(aps_log_filter) + self.scheduler.add_listener(self._dispatch_error, EVENT_JOB_ERROR) + + def _build_args(self, job): + if self._dispatcher.use_context: + return [CallbackContext.from_job(job, self._dispatcher)] + return [self._dispatcher.bot, job] + + def _tz_now(self): + return datetime.datetime.now(self.scheduler.timezone) + + def _update_persistence(self, event): + self._dispatcher.update_persistence() + + def _dispatch_error(self, event): + try: + self._dispatcher.dispatch_error(None, event.exception) + # Errors should not stop the thread. + except Exception: + self.logger.exception('An error was raised while processing the job and an ' + 'uncaught error was raised while handling the error ' + 'with an error_handler.') + + def _parse_time_input(self, time, shift_day=False): + if time is None: + return None + if isinstance(time, (int, float)): + return self._tz_now() + datetime.timedelta(seconds=time) + if isinstance(time, datetime.timedelta): + return self._tz_now() + time + if isinstance(time, datetime.time): + dt = datetime.datetime.combine( + datetime.datetime.now(tz=time.tzinfo or self.scheduler.timezone).date(), time, + tzinfo=time.tzinfo or self.scheduler.timezone) + if shift_day and dt <= datetime.datetime.now(pytz.utc): + dt += datetime.timedelta(days=1) + return dt + # isinstance(time, datetime.datetime): + return time def set_dispatcher(self, dispatcher): """Set the dispatcher to be used by this JobQueue. Use this instead of passing a @@ -80,37 +107,7 @@ class JobQueue: """ self._dispatcher = dispatcher - def _put(self, job, time_spec=None, previous_t=None): - """ - Enqueues the job, scheduling its next run at the correct time. - - Args: - job (telegram.ext.Job): job to enqueue - time_spec (optional): - Specification of the time for which the job should be scheduled. The precise - semantics of this parameter depend on its type (see - :func:`telegram.ext.JobQueue.run_repeating` for details). - Defaults to now + ``job.interval``. - previous_t (optional): - Time at which the job last ran (:obj:`None` if it hasn't run yet). - - """ - # get time at which to run: - if time_spec is None: - time_spec = job.interval - if time_spec is None: - raise ValueError("no time specification given for scheduling non-repeating job") - next_t = to_float_timestamp(time_spec, reference_timestamp=previous_t) - - # enqueue: - self.logger.debug('Putting job %s with t=%s', job.name, time_spec) - self._queue.put((next_t, job)) - job._set_next_t(next_t) - - # Wake up the loop if this job should be executed next - self._set_next_peek(next_t) - - def run_once(self, callback, when, context=None, name=None): + def run_once(self, callback, when, context=None, name=None, job_kwargs=None): """Creates a new ``Job`` that runs once and adds it to the queue. Args: @@ -144,24 +141,34 @@ class JobQueue: Can be accessed through ``job.context`` in the callback. Defaults to :obj:`None`. name (:obj:`str`, optional): The name of the new job. Defaults to ``callback.__name__``. + job_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to pass to the + ``scheduler.add_job()``. Returns: :class:`telegram.ext.Job`: The new ``Job`` instance that has been added to the job queue. """ - tzinfo = when.tzinfo if isinstance(when, (datetime.datetime, datetime.time)) else None + if not job_kwargs: + job_kwargs = {} - job = Job(callback, - repeat=False, - context=context, - name=name, - job_queue=self, - tzinfo=tzinfo) - self._put(job, time_spec=when) + name = name or callback.__name__ + job = Job(callback, context, name, self) + dt = self._parse_time_input(when, shift_day=True) + + j = self.scheduler.add_job(callback, + name=name, + trigger='date', + run_date=dt, + args=self._build_args(job), + timezone=dt.tzinfo or self.scheduler.timezone, + **job_kwargs) + + job.job = j return job - def run_repeating(self, callback, interval, first=None, context=None, name=None): + def run_repeating(self, callback, interval, first=None, last=None, context=None, name=None, + job_kwargs=None): """Creates a new ``Job`` that runs at specified intervals and adds it to the queue. Args: @@ -195,10 +202,21 @@ class JobQueue: then ``first.tzinfo`` will define ``Job.tzinfo``. Otherwise UTC will be assumed. Defaults to ``interval`` + last (:obj:`int` | :obj:`float` | :obj:`datetime.timedelta` | \ + :obj:`datetime.datetime` | :obj:`datetime.time`, optional): + Latest possible time for the job to run. This parameter will be interpreted + depending on its type. See ``first`` for details. + + If ``last`` is :obj:`datetime.datetime` or :obj:`datetime.time` type + and ``last.tzinfo`` is :obj:`None`, UTC will be assumed. + + Defaults to :obj:`None`. context (:obj:`object`, optional): Additional data needed for the callback function. Can be accessed through ``job.context`` in the callback. Defaults to :obj:`None`. name (:obj:`str`, optional): The name of the new job. Defaults to ``callback.__name__``. + job_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to pass to the + ``scheduler.add_job()``. Returns: :class:`telegram.ext.Job`: The new ``Job`` instance that has been added to the job @@ -210,19 +228,35 @@ class JobQueue: to pin servers to UTC time, then time related behaviour can always be expected. """ - tzinfo = first.tzinfo if isinstance(first, (datetime.datetime, datetime.time)) else None + if not job_kwargs: + job_kwargs = {} - job = Job(callback, - interval=interval, - repeat=True, - context=context, - name=name, - job_queue=self, - tzinfo=tzinfo) - self._put(job, time_spec=first) + name = name or callback.__name__ + job = Job(callback, context, name, self) + + dt_first = self._parse_time_input(first) + dt_last = self._parse_time_input(last) + + if dt_last and dt_first and dt_last < dt_first: + raise ValueError("'last' must not be before 'first'!") + + if isinstance(interval, datetime.timedelta): + interval = interval.total_seconds() + + j = self.scheduler.add_job(callback, + trigger='interval', + args=self._build_args(job), + start_date=dt_first, + end_date=dt_last, + seconds=interval, + name=name, + **job_kwargs) + + job.job = j return job - def run_monthly(self, callback, when, day, context=None, name=None, day_is_strict=True): + def run_monthly(self, callback, when, day, context=None, name=None, day_is_strict=True, + job_kwargs=None): """Creates a new ``Job`` that runs on a monthly basis and adds it to the queue. Args: @@ -244,92 +278,55 @@ class JobQueue: ``callback.__name__``. day_is_strict (:obj:`bool`, optional): If :obj:`False` and day > month.days, will pick the last day in the month. Defaults to :obj:`True`. + job_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to pass to the + ``scheduler.add_job()``. Returns: :class:`telegram.ext.Job`: The new ``Job`` instance that has been added to the job queue. """ - tzinfo = when.tzinfo if isinstance(when, (datetime.datetime, datetime.time)) else None - if 1 <= day <= 31: - next_dt = self._get_next_month_date(day, day_is_strict, when, allow_now=True) - job = Job(callback, repeat=False, context=context, name=name, job_queue=self, - is_monthly=True, day_is_strict=day_is_strict, tzinfo=tzinfo) - self._put(job, time_spec=next_dt) - return job + if not job_kwargs: + job_kwargs = {} + + name = name or callback.__name__ + job = Job(callback, context, name, self) + + if day_is_strict: + j = self.scheduler.add_job(callback, + trigger='cron', + args=self._build_args(job), + name=name, + day=day, + hour=when.hour, + minute=when.minute, + second=when.second, + timezone=when.tzinfo or self.scheduler.timezone, + **job_kwargs) else: - raise ValueError("The elements of the 'day' argument should be from 1 up to" - " and including 31") + trigger = OrTrigger([CronTrigger(day=day, + hour=when.hour, + minute=when.minute, + second=when.second, + timezone=when.tzinfo, + **job_kwargs), + CronTrigger(day='last', + hour=when.hour, + minute=when.minute, + second=when.second, + timezone=when.tzinfo or self.scheduler.timezone, + **job_kwargs)]) + j = self.scheduler.add_job(callback, + trigger=trigger, + args=self._build_args(job), + name=name, + **job_kwargs) - def _get_next_month_date(self, day, day_is_strict, when, allow_now=False): - """This method returns the date that the next monthly job should be scheduled. + job.job = j + return job - Args: - day (:obj:`int`): The day of the month the job should run. - day_is_strict (:obj:`bool`): - Specification as to whether the specified day of job should be strictly - respected. If day_is_strict is :obj:`True` it ignores months whereby the - specified date does not exist (e.g February 31st). If it set to :obj:`False`, - it returns the last valid date of the month instead. For example, - if the user runs a job on the 31st of every month, and sets - the day_is_strict variable to :obj:`False`, April, for example, - the job would run on April 30th. - when (:obj:`datetime.time`): Time of day at which the job should run. If the - timezone (``time.tzinfo``) is :obj:`None`, UTC will be assumed. - allow_now (:obj:`bool`): Whether executing the job right now is a feasible options. - For stability reasons, this defaults to :obj:`False`, but it needs to be :obj:`True` - on initializing a job. - - """ - dt = datetime.datetime.now(tz=when.tzinfo or datetime.timezone.utc) - dt_time = dt.time().replace(tzinfo=when.tzinfo) - days_in_current_month = calendar.monthrange(dt.year, dt.month)[1] - days_till_months_end = days_in_current_month - dt.day - if days_in_current_month < day: - # if the day does not exist in the current month (e.g Feb 31st) - if day_is_strict is False: - # set day as last day of month instead - next_dt = dt + datetime.timedelta(days=days_till_months_end) - else: - # else set as day in subsequent month. Subsequent month is - # guaranteed to have the date, if current month does not have the date. - next_dt = dt + datetime.timedelta(days=days_till_months_end + day) - else: - # if the day exists in the current month - if dt.day < day: - # day is upcoming - next_dt = dt + datetime.timedelta(day - dt.day) - elif dt.day > day or (dt.day == day and ((not allow_now and dt_time >= when) - or (allow_now and dt_time > when))): - # run next month if day has already passed - next_year = dt.year + 1 if dt.month == 12 else dt.year - next_month = 1 if dt.month == 12 else dt.month + 1 - days_in_next_month = calendar.monthrange(next_year, next_month)[1] - next_month_has_date = days_in_next_month >= day - if next_month_has_date: - next_dt = dt + datetime.timedelta(days=days_till_months_end + day) - elif day_is_strict: - # schedule the subsequent month if day is strict - next_dt = dt + datetime.timedelta( - days=days_till_months_end + days_in_next_month + day) - else: - # schedule in the next month last date if day is not strict - next_dt = dt + datetime.timedelta(days=days_till_months_end - + days_in_next_month) - - else: - # day is today but time has not yet come - next_dt = dt - - # Set the correct time - next_dt = next_dt.replace(hour=when.hour, minute=when.minute, second=when.second, - microsecond=when.microsecond) - # fold is new in Py3.6 - if hasattr(next_dt, 'fold'): - next_dt = next_dt.replace(fold=when.fold) - return next_dt - - def run_daily(self, callback, time, days=Days.EVERY_DAY, context=None, name=None): + def run_daily(self, callback, time, days=Days.EVERY_DAY, context=None, name=None, + job_kwargs=None): """Creates a new ``Job`` that runs on a daily basis and adds it to the queue. Args: @@ -349,158 +346,112 @@ class JobQueue: Can be accessed through ``job.context`` in the callback. Defaults to :obj:`None`. name (:obj:`str`, optional): The name of the new job. Defaults to ``callback.__name__``. + job_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to pass to the + ``scheduler.add_job()``. Returns: :class:`telegram.ext.Job`: The new ``Job`` instance that has been added to the job queue. Note: - Daily is just an alias for "24 Hours". That means that if DST changes during that - interval, the job might not run at the time one would expect. It is always recommended - to pin servers to UTC time, then time related behaviour can always be expected. + For a note about DST, please see the documentation of `APScheduler`_. + + .. _`APScheduler`: https://apscheduler.readthedocs.io/en/stable/modules/triggers/cron.html + #daylight-saving-time-behavior """ - job = Job(callback, - interval=datetime.timedelta(days=1), - repeat=True, - days=days, - tzinfo=time.tzinfo, - context=context, - name=name, - job_queue=self) - self._put(job, time_spec=time) + if not job_kwargs: + job_kwargs = {} + + name = name or callback.__name__ + job = Job(callback, context, name, self) + + j = self.scheduler.add_job(callback, + name=name, + args=self._build_args(job), + trigger='cron', + day_of_week=','.join([str(d) for d in days]), + hour=time.hour, + minute=time.minute, + second=time.second, + timezone=time.tzinfo or self.scheduler.timezone, + **job_kwargs) + + job.job = j return job - def _set_next_peek(self, t): - # """ - # Set next peek if not defined or `t` is before next peek. - # In case the next peek was set, also trigger the `self.__tick` event. - # """ - with self.__next_peek_lock: - if not self._next_peek or self._next_peek > t: - self._next_peek = t - self.__tick.set() + def run_custom(self, callback, job_kwargs, context=None, name=None): + """Creates a new customly defined ``Job``. - def tick(self): - """Run all jobs that are due and re-enqueue them with their interval.""" - now = time.time() + Args: + callback (:obj:`callable`): The callback function that should be executed by the new + job. Callback signature for context based API: - self.logger.debug('Ticking jobs with t=%f', now) + ``def callback(CallbackContext)`` - while True: - try: - t, job = self._queue.get(False) - except Empty: - break + ``context.job`` is the :class:`telegram.ext.Job` instance. It can be used to access + its ``job.context`` or change it to a repeating job. + job_kwargs (:obj:`dict`): Arbitrary keyword arguments. Used as arguments for + ``scheduler.add_job``. + context (:obj:`object`, optional): Additional data needed for the callback function. + Can be accessed through ``job.context`` in the callback. Defaults to :obj:`None`. + name (:obj:`str`, optional): The name of the new job. Defaults to + ``callback.__name__``. - self.logger.debug('Peeked at %s with t=%f', job.name, t) + Returns: + :class:`telegram.ext.Job`: The new ``Job`` instance that has been added to the job + queue. - if t > now: - # We can get here in two conditions: - # 1. At the second or later pass of the while loop, after we've already - # processed the job(s) we were supposed to at this time. - # 2. At the first iteration of the loop only if `self.put()` had triggered - # `self.__tick` because `self._next_peek` wasn't set - self.logger.debug("Next task isn't due yet. Finished!") - self._queue.put((t, job)) - self._set_next_peek(t) - break + """ + name = name or callback.__name__ + job = Job(callback, context, name, self) - if job.removed: - self.logger.debug('Removing job %s', job.name) - continue + j = self.scheduler.add_job(callback, + args=self._build_args(job), + name=name, + **job_kwargs) - if job.enabled: - try: - current_week_day = datetime.datetime.now(job.tzinfo).date().weekday() - if current_week_day in job.days: - self.logger.debug('Running job %s', job.name) - job.run(self._dispatcher) - self._dispatcher.update_persistence() - - except Exception: - self.logger.exception('An uncaught error was raised while executing job %s', - job.name) - else: - self.logger.debug('Skipping disabled job %s', job.name) - - if job.repeat and not job.removed: - self._put(job, previous_t=t) - elif job.is_monthly and not job.removed: - dt = datetime.datetime.now(tz=job.tzinfo) - dt_time = dt.time().replace(tzinfo=job.tzinfo) - self._put(job, time_spec=self._get_next_month_date(dt.day, job.day_is_strict, - dt_time)) - else: - job._set_next_t(None) - self.logger.debug('Dropping non-repeating or removed job %s', job.name) + job.job = j + return job def start(self): """Starts the job_queue thread.""" - self.__start_lock.acquire() - - if not self._running: - self._running = True - self.__start_lock.release() - self.__thread = Thread(target=self._main_loop, - name="Bot:{}:job_queue".format(self._dispatcher.bot.id)) - self.__thread.start() - self.logger.debug('%s thread started', self.__class__.__name__) - else: - self.__start_lock.release() - - def _main_loop(self): - """ - Thread target of thread ``job_queue``. Runs in background and performs ticks on the job - queue. - - """ - while self._running: - # self._next_peek may be (re)scheduled during self.tick() or self.put() - with self.__next_peek_lock: - tmout = self._next_peek - time.time() if self._next_peek else None - self._next_peek = None - self.__tick.clear() - - self.__tick.wait(tmout) - - # If we were woken up by self.stop(), just bail out - if not self._running: - break - - self.tick() - - self.logger.debug('%s thread stopped', self.__class__.__name__) + if not self.scheduler.running: + self.scheduler.start() def stop(self): """Stops the thread.""" - with self.__start_lock: - self._running = False - - self.__tick.set() - if self.__thread is not None: - self.__thread.join() + if self.scheduler.running: + self.scheduler.shutdown() def jobs(self): """Returns a tuple of all jobs that are currently in the ``JobQueue``.""" - with self._queue.mutex: - return tuple(job[1] for job in self._queue.queue if job) + return tuple(Job.from_aps_job(job, self) for job in self.scheduler.get_jobs()) def get_jobs_by_name(self, name): """Returns a tuple of jobs with the given name that are currently in the ``JobQueue``""" - with self._queue.mutex: - return tuple(job[1] for job in self._queue.queue if job and job[1].name == name) + return tuple(job for job in self.jobs() if job.name == name) class Job: - """This class encapsulates a Job. + """This class is a convenience wrapper for the jobs held in a :class:`telegram.ext.JobQueue`. + With the current backend APScheduler, :attr:`job` holds a :class:`apscheduler.job.Job` + instance. + + Note: + * All attributes and instance methods of :attr:`job` are also directly available as + attributes/methods of the corresponding :class:`telegram.ext.Job` object. + * Two instances of :class:`telegram.ext.Job` are considered equal, if their corresponding + ``job`` attributes have the same ``id``. + * If :attr:`job` isn't passed on initialization, it must be set manually afterwards for + this :class:`telegram.ext.Job` to be useful. Attributes: callback (:obj:`callable`): The callback function that should be executed by the new job. context (:obj:`object`): Optional. Additional data needed for the callback function. name (:obj:`str`): Optional. The name of the new job. - is_monthly (:obj: `bool`): Optional. Indicates whether it is a monthly job. - day_is_strict (:obj: `bool`): Optional. Indicates whether the monthly jobs day is strict. + job_queue (:class:`telegram.ext.JobQueue`): Optional. The ``JobQueue`` this job belongs to. + job (:class:`apscheduler.job.Job`): Optional. The APS Job this job is a wrapper for. Args: callback (:obj:`callable`): The callback function that should be executed by the new job. @@ -510,125 +461,72 @@ class Job: a ``context.job`` is the :class:`telegram.ext.Job` instance. It can be used to access its ``job.context`` or change it to a repeating job. - interval (:obj:`int` | :obj:`float` | :obj:`datetime.timedelta`, optional): The time - interval between executions of the job. If it is an :obj:`int` or a :obj:`float`, - it will be interpreted as seconds. If you don't set this value, you must set - :attr:`repeat` to :obj:`False` and specify :attr:`time_spec` when you put the job into - the job queue. - repeat (:obj:`bool`, optional): If this job should be periodically execute its callback - function (:obj:`True`) or only once (:obj:`False`). Defaults to :obj:`True`. context (:obj:`object`, optional): Additional data needed for the callback function. Can be accessed through ``job.context`` in the callback. Defaults to :obj:`None`. name (:obj:`str`, optional): The name of the new job. Defaults to ``callback.__name__``. - days (Tuple[:obj:`int`], optional): Defines on which days of the week the job should run. - Defaults to ``Days.EVERY_DAY`` job_queue (:class:`telegram.ext.JobQueue`, optional): The ``JobQueue`` this job belongs to. Only optional for backward compatibility with ``JobQueue.put()``. - tzinfo (:obj:`datetime.tzinfo`, optional): timezone associated to this job. Used when - checking the day of the week to determine whether a job should run (only relevant when - ``days is not Days.EVERY_DAY``). Defaults to UTC. - is_monthly (:obj:`bool`, optional): If this job is supposed to be a monthly scheduled job. - Defaults to :obj:`False`. - day_is_strict (:obj:`bool`, optional): If :obj:`False` and day > month.days, will pick the - last day in the month. Defaults to :obj:`True`. Only relevant when ``is_monthly`` is - :obj:`True`. + job (:class:`apscheduler.job.Job`, optional): The APS Job this job is a wrapper for. """ def __init__(self, callback, - interval=None, - repeat=True, context=None, - days=Days.EVERY_DAY, name=None, job_queue=None, - tzinfo=None, - is_monthly=False, - day_is_strict=True): + job=None): self.callback = callback self.context = context self.name = name or callback.__name__ + self.job_queue = job_queue - self._repeat = None - self._interval = None - self.interval = interval - self._next_t = None - self.repeat = repeat - self.is_monthly = is_monthly - self.day_is_strict = day_is_strict + self._removed = False + self._enabled = False - self._days = None - self.days = days - self.tzinfo = tzinfo or datetime.timezone.utc - - self._job_queue = weakref.proxy(job_queue) if job_queue is not None else None - - self._remove = Event() - self._enabled = Event() - self._enabled.set() + self.job = job def run(self, dispatcher): - """Executes the callback function.""" - if dispatcher.use_context: - self.callback(CallbackContext.from_job(self, dispatcher)) - else: - self.callback(dispatcher.bot, self) + """Executes the callback function independently of the jobs schedule.""" + try: + if dispatcher.use_context: + self.callback(CallbackContext.from_job(self, dispatcher)) + else: + self.callback(dispatcher.bot, self) + except Exception as e: + try: + dispatcher.dispatch_error(None, e) + # Errors should not stop the thread. + except Exception: + dispatcher.logger.exception('An error was raised while processing the job and an ' + 'uncaught error was raised while handling the error ' + 'with an error_handler.') def schedule_removal(self): """ Schedules this job for removal from the ``JobQueue``. It will be removed without executing its callback function again. - """ - self._remove.set() - self._next_t = None + self.job.remove() + self._removed = True @property def removed(self): """:obj:`bool`: Whether this job is due to be removed.""" - return self._remove.is_set() + return self._removed @property def enabled(self): """:obj:`bool`: Whether this job is enabled.""" - return self._enabled.is_set() + return self._enabled @enabled.setter def enabled(self, status): if status: - self._enabled.set() + self.job.resume() else: - self._enabled.clear() - - @property - def interval(self): - """ - :obj:`int` | :obj:`float` | :obj:`datetime.timedelta`: Optional. The interval in which the - job will run. - - """ - return self._interval - - @interval.setter - def interval(self, interval): - if interval is None and self.repeat: - raise ValueError("The 'interval' can not be 'None' when 'repeat' is set to 'True'") - - if not (interval is None or isinstance(interval, (Number, datetime.timedelta))): - raise TypeError("The 'interval' must be of type 'datetime.timedelta'," - " 'int' or 'float'") - - self._interval = interval - - @property - def interval_seconds(self): - """:obj:`int`: The interval for this job in seconds.""" - interval = self.interval - if isinstance(interval, datetime.timedelta): - return interval.total_seconds() - else: - return interval + self.job.pause() + self._enabled = status @property def next_t(self): @@ -636,63 +534,25 @@ class Job: :obj:`datetime.datetime`: Datetime for the next job execution. Datetime is localized according to :attr:`tzinfo`. If job is removed or already ran it equals to :obj:`None`. - """ - return datetime.datetime.fromtimestamp(self._next_t, self.tzinfo) if self._next_t else None + return self.job.next_run_time - def _set_next_t(self, next_t): - if isinstance(next_t, datetime.datetime): - # Set timezone to UTC in case datetime is in local timezone. - next_t = next_t.astimezone(datetime.timezone.utc) - next_t = to_float_timestamp(next_t) - elif not (isinstance(next_t, Number) or next_t is None): - raise TypeError("The 'next_t' argument should be one of the following types: " - "'float', 'int', 'datetime.datetime' or 'NoneType'") - - self._next_t = next_t - - @property - def repeat(self): - """:obj:`bool`: Optional. If this job should periodically execute its callback function.""" - return self._repeat - - @repeat.setter - def repeat(self, repeat): - if self.interval is None and repeat: - raise ValueError("'repeat' can not be set to 'True' when no 'interval' is set") - self._repeat = repeat - - @property - def days(self): - """Tuple[:obj:`int`]: Optional. Defines on which days of the week the job should run.""" - return self._days - - @days.setter - def days(self, days): - if not isinstance(days, tuple): - raise TypeError("The 'days' argument should be of type 'tuple'") - - if not all(isinstance(day, int) for day in days): - raise TypeError("The elements of the 'days' argument should be of type 'int'") - - if not all(0 <= day <= 6 for day in days): - raise ValueError("The elements of the 'days' argument should be from 0 up to and " - "including 6") - - self._days = days - - @property - def job_queue(self): - """:class:`telegram.ext.JobQueue`: Optional. The ``JobQueue`` this job belongs to.""" - return self._job_queue - - @job_queue.setter - def job_queue(self, job_queue): - # Property setter for backward compatibility with JobQueue.put() - if not self._job_queue: - self._job_queue = weakref.proxy(job_queue) + @classmethod + def from_aps_job(cls, job, job_queue): + # context based callbacks + if len(job.args) == 1: + context = job.args[0].job.context else: - raise RuntimeError("The 'job_queue' attribute can only be set once.") + context = job.args[1].context + return cls(job.func, context=context, name=job.name, job_queue=job_queue, job=job) + + def __getattr__(self, item): + return getattr(self.job, item) def __lt__(self, other): return False + + def __eq__(self, other): + if isinstance(other, self.__class__): + return self.id == other.id + return False diff --git a/tests/conftest.py b/tests/conftest.py index 5f00fa5f6..9a6d8fbe6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -25,6 +25,7 @@ from threading import Thread, Event from time import sleep import pytest +import pytz from telegram import (Bot, Message, User, Chat, MessageEntity, Update, InlineQuery, CallbackQuery, ShippingQuery, PreCheckoutQuery, @@ -271,14 +272,14 @@ def false_update(request): return Update(update_id=1, **request.param) -@pytest.fixture(params=[1, 2], ids=lambda h: 'UTC +{hour:0>2}:00'.format(hour=h)) -def utc_offset(request): - return datetime.timedelta(hours=request.param) +@pytest.fixture(params=['Europe/Berlin', 'Asia/Singapore', 'UTC']) +def tzinfo(request): + return pytz.timezone(request.param) @pytest.fixture() -def timezone(utc_offset): - return datetime.timezone(utc_offset) +def timezone(tzinfo): + return tzinfo def expect_bad_request(func, message, reason): diff --git a/tests/test_conversationhandler.py b/tests/test_conversationhandler.py index 936cb4c3f..d6dafce79 100644 --- a/tests/test_conversationhandler.py +++ b/tests/test_conversationhandler.py @@ -25,7 +25,7 @@ from telegram import (CallbackQuery, Chat, ChosenInlineResult, InlineQuery, Mess PreCheckoutQuery, ShippingQuery, Update, User, MessageEntity) from telegram.ext import (ConversationHandler, CommandHandler, CallbackQueryHandler, MessageHandler, Filters, InlineQueryHandler, CallbackContext, - DispatcherHandlerStop, TypeHandler) + DispatcherHandlerStop, TypeHandler, JobQueue) @pytest.fixture(scope='class') @@ -38,6 +38,15 @@ def user2(): return User(first_name='Mister Test', id=124, is_bot=False) +@pytest.fixture(autouse=True) +def start_stop_job_queue(dp): + dp.job_queue = JobQueue() + dp.job_queue.set_dispatcher(dp) + dp.job_queue.start() + yield + dp.job_queue.stop() + + def raise_dphs(func): def decorator(self, *args, **kwargs): result = func(self, *args, **kwargs) @@ -610,8 +619,7 @@ class TestConversationHandler: bot=bot) dp.process_update(Update(update_id=0, message=message)) assert handler.conversations.get((self.group.id, user1.id)) == self.THIRSTY - sleep(0.5) - dp.job_queue.tick() + sleep(0.65) assert handler.conversations.get((self.group.id, user1.id)) is None # Start state machine, do something, then reach timeout @@ -619,11 +627,9 @@ class TestConversationHandler: assert handler.conversations.get((self.group.id, user1.id)) == self.THIRSTY message.text = '/brew' message.entities[0].length = len('/brew') - dp.job_queue.tick() dp.process_update(Update(update_id=2, message=message)) assert handler.conversations.get((self.group.id, user1.id)) == self.BREWING - sleep(0.5) - dp.job_queue.tick() + sleep(0.6) assert handler.conversations.get((self.group.id, user1.id)) is None def test_conversation_timeout_dispatcher_handler_stop(self, dp, bot, user1, caplog): @@ -645,8 +651,7 @@ class TestConversationHandler: with caplog.at_level(logging.WARNING): dp.process_update(Update(update_id=0, message=message)) assert handler.conversations.get((self.group.id, user1.id)) == self.THIRSTY - sleep(0.5) - dp.job_queue.tick() + sleep(0.8) assert handler.conversations.get((self.group.id, user1.id)) is None assert len(caplog.records) == 1 rec = caplog.records[-1] @@ -684,8 +689,7 @@ class TestConversationHandler: timeout_handler.callback = timeout_callback cdp.process_update(update) - sleep(0.5) - cdp.job_queue.tick() + sleep(0.6) assert handler.conversations.get((self.group.id, user1.id)) is None assert self.is_timeout @@ -708,24 +712,20 @@ class TestConversationHandler: dp.process_update(Update(update_id=0, message=message)) assert handler.conversations.get((self.group.id, user1.id)) == self.THIRSTY sleep(0.25) # t=.25 - dp.job_queue.tick() assert handler.conversations.get((self.group.id, user1.id)) == self.THIRSTY message.text = '/brew' message.entities[0].length = len('/brew') dp.process_update(Update(update_id=0, message=message)) assert handler.conversations.get((self.group.id, user1.id)) == self.BREWING sleep(0.35) # t=.6 - dp.job_queue.tick() assert handler.conversations.get((self.group.id, user1.id)) == self.BREWING message.text = '/pourCoffee' message.entities[0].length = len('/pourCoffee') dp.process_update(Update(update_id=0, message=message)) assert handler.conversations.get((self.group.id, user1.id)) == self.DRINKING sleep(.4) # t=1 - dp.job_queue.tick() assert handler.conversations.get((self.group.id, user1.id)) == self.DRINKING - sleep(.1) # t=1.1 - dp.job_queue.tick() + sleep(.2) # t=1.2 assert handler.conversations.get((self.group.id, user1.id)) is None def test_conversation_timeout_two_users(self, dp, bot, user1, user2): @@ -744,16 +744,13 @@ class TestConversationHandler: message.entities[0].length = len('/brew') message.entities[0].length = len('/brew') message.from_user = user2 - dp.job_queue.tick() dp.process_update(Update(update_id=0, message=message)) assert handler.conversations.get((self.group.id, user2.id)) is None message.text = '/start' message.entities[0].length = len('/start') - dp.job_queue.tick() dp.process_update(Update(update_id=0, message=message)) assert handler.conversations.get((self.group.id, user2.id)) == self.THIRSTY - sleep(0.5) - dp.job_queue.tick() + sleep(0.6) assert handler.conversations.get((self.group.id, user1.id)) is None assert handler.conversations.get((self.group.id, user2.id)) is None @@ -776,8 +773,7 @@ class TestConversationHandler: message.text = '/brew' message.entities[0].length = len('/brew') dp.process_update(Update(update_id=0, message=message)) - sleep(0.5) - dp.job_queue.tick() + sleep(0.6) assert handler.conversations.get((self.group.id, user1.id)) is None assert self.is_timeout @@ -786,8 +782,7 @@ class TestConversationHandler: message.text = '/start' message.entities[0].length = len('/start') dp.process_update(Update(update_id=1, message=message)) - sleep(0.5) - dp.job_queue.tick() + sleep(0.6) assert handler.conversations.get((self.group.id, user1.id)) is None assert self.is_timeout @@ -800,8 +795,7 @@ class TestConversationHandler: message.text = '/startCoding' message.entities[0].length = len('/startCoding') dp.process_update(Update(update_id=0, message=message)) - sleep(0.5) - dp.job_queue.tick() + sleep(0.6) assert handler.conversations.get((self.group.id, user1.id)) is None assert not self.is_timeout @@ -824,8 +818,7 @@ class TestConversationHandler: message.text = '/brew' message.entities[0].length = len('/brew') cdp.process_update(Update(update_id=0, message=message)) - sleep(0.5) - cdp.job_queue.tick() + sleep(0.6) assert handler.conversations.get((self.group.id, user1.id)) is None assert self.is_timeout @@ -834,8 +827,7 @@ class TestConversationHandler: message.text = '/start' message.entities[0].length = len('/start') cdp.process_update(Update(update_id=1, message=message)) - sleep(0.5) - cdp.job_queue.tick() + sleep(0.6) assert handler.conversations.get((self.group.id, user1.id)) is None assert self.is_timeout @@ -848,8 +840,7 @@ class TestConversationHandler: message.text = '/startCoding' message.entities[0].length = len('/startCoding') cdp.process_update(Update(update_id=0, message=message)) - sleep(0.5) - cdp.job_queue.tick() + sleep(0.6) assert handler.conversations.get((self.group.id, user1.id)) is None assert not self.is_timeout @@ -865,7 +856,6 @@ class TestConversationHandler: def slowbrew(_bot, update): sleep(0.25) # Let's give to the original timeout a chance to execute - dp.job_queue.tick() sleep(0.25) # By returning None we do not override the conversation state so # we can see if the timeout has been executed @@ -887,16 +877,13 @@ class TestConversationHandler: bot=bot) dp.process_update(Update(update_id=0, message=message)) sleep(0.25) - dp.job_queue.tick() message.text = '/slowbrew' message.entities[0].length = len('/slowbrew') dp.process_update(Update(update_id=0, message=message)) - dp.job_queue.tick() assert handler.conversations.get((self.group.id, user1.id)) is not None assert not self.is_timeout - sleep(0.5) - dp.job_queue.tick() + sleep(0.6) assert handler.conversations.get((self.group.id, user1.id)) is None assert self.is_timeout diff --git a/tests/test_helpers.py b/tests/test_helpers.py index fd74f496b..7aa62f9b3 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -86,9 +86,10 @@ class TestHelpers: """Conversion from timezone-aware datetime to timestamp""" # we're parametrizing this with two different UTC offsets to exclude the possibility # of an xpass when the test is run in a timezone with the same UTC offset - datetime = dtm.datetime(2019, 11, 11, 0, 26, 16, 10**5, tzinfo=timezone) + test_datetime = dtm.datetime(2019, 11, 11, 0, 26, 16, 10**5) + datetime = timezone.localize(test_datetime) assert (helpers.to_float_timestamp(datetime) - == 1573431976.1 - timezone.utcoffset(None).total_seconds()) + == 1573431976.1 - timezone.utcoffset(test_datetime).total_seconds()) def test_to_float_timestamp_absolute_no_reference(self): """A reference timestamp is only relevant for relative time specifications""" @@ -116,14 +117,15 @@ class TestHelpers: """Conversion from timezone-aware time-of-day specification to timestamp""" # we're parametrizing this with two different UTC offsets to exclude the possibility # of an xpass when the test is run in a timezone with the same UTC offset - utc_offset = timezone.utcoffset(None) ref_datetime = dtm.datetime(1970, 1, 1, 12) + utc_offset = timezone.utcoffset(ref_datetime) ref_t, time_of_day = _datetime_to_float_timestamp(ref_datetime), ref_datetime.time() + aware_time_of_day = timezone.localize(ref_datetime).timetz() # first test that naive time is assumed to be utc: assert helpers.to_float_timestamp(time_of_day, ref_t) == pytest.approx(ref_t) # test that by setting the timezone the timestamp changes accordingly: - assert (helpers.to_float_timestamp(time_of_day.replace(tzinfo=timezone), ref_t) + assert (helpers.to_float_timestamp(aware_time_of_day, ref_t) == pytest.approx(ref_t + (-utc_offset.total_seconds() % (24 * 60 * 60)))) @pytest.mark.parametrize('time_spec', RELATIVE_TIME_SPECS, ids=str) @@ -149,9 +151,10 @@ class TestHelpers: def test_from_timestamp_aware(self, timezone): # we're parametrizing this with two different UTC offsets to exclude the possibility # of an xpass when the test is run in a timezone with the same UTC offset - datetime = dtm.datetime(2019, 11, 11, 0, 26, 16, 10**5, tzinfo=timezone) - assert (helpers.from_timestamp(1573431976.1 - timezone.utcoffset(None).total_seconds()) - == datetime) + test_datetime = dtm.datetime(2019, 11, 11, 0, 26, 16, 10 ** 5) + datetime = timezone.localize(test_datetime) + assert (helpers.from_timestamp( + 1573431976.1 - timezone.utcoffset(test_datetime).total_seconds()) == datetime) def test_create_deep_linked_url(self): username = 'JamesTheMock' diff --git a/tests/test_inputfile.py b/tests/test_inputfile.py index e1fd01ceb..b961ff527 100644 --- a/tests/test_inputfile.py +++ b/tests/test_inputfile.py @@ -51,8 +51,7 @@ class TestInputFile: def test_mimetypes(self): # Only test a few to make sure logic works okay assert InputFile(open('tests/data/telegram.jpg', 'rb')).mimetype == 'image/jpeg' - if sys.version_info >= (3, 5): - assert InputFile(open('tests/data/telegram.webp', 'rb')).mimetype == 'image/webp' + assert InputFile(open('tests/data/telegram.webp', 'rb')).mimetype == 'image/webp' assert InputFile(open('tests/data/telegram.mp3', 'rb')).mimetype == 'audio/mpeg' # Test guess from file diff --git a/tests/test_jobqueue.py b/tests/test_jobqueue.py index 24328d429..995dc4a3e 100644 --- a/tests/test_jobqueue.py +++ b/tests/test_jobqueue.py @@ -18,15 +18,17 @@ # along with this program. If not, see [http://www.gnu.org/licenses/]. import calendar import datetime as dtm +import logging import os import time from queue import Queue from time import sleep import pytest +import pytz +from apscheduler.schedulers import SchedulerNotRunningError from flaky import flaky from telegram.ext import JobQueue, Updater, Job, CallbackContext -from telegram.utils.deprecate import TelegramDeprecationWarning @pytest.fixture(scope='function') @@ -44,16 +46,18 @@ def job_queue(bot, _dp): class TestJobQueue: result = 0 job_time = 0 + received_error = None @pytest.fixture(autouse=True) def reset(self): self.result = 0 self.job_time = 0 + self.received_error = None def job_run_once(self, bot, job): self.result += 1 - def job_with_exception(self, bot, job): + def job_with_exception(self, bot, job=None): raise Exception('Test Error') def job_remove_self(self, bot, job): @@ -74,32 +78,32 @@ class TestJobQueue: and context.chat_data is None and context.user_data is None and isinstance(context.bot_data, dict) - and context.job_queue is context.job.job_queue): + and context.job_queue is not context.job.job_queue): self.result += 1 + def error_handler(self, bot, update, error): + self.received_error = str(error) + + def error_handler_context(self, update, context): + self.received_error = str(context.error) + + def error_handler_raise_error(self, *args): + raise Exception('Failing bigly') + def test_run_once(self, job_queue): job_queue.run_once(self.job_run_once, 0.01) sleep(0.02) assert self.result == 1 def test_run_once_timezone(self, job_queue, timezone): - """Test the correct handling of aware datetimes. - Set the target datetime to utcnow + x hours (naive) with the timezone set to utc + x hours, - which is equivalent to now. - """ + """Test the correct handling of aware datetimes""" # we're parametrizing this with two different UTC offsets to exclude the possibility # of an xpass when the test is run in a timezone with the same UTC offset - when = (dtm.datetime.utcnow() + timezone.utcoffset(None)).replace(tzinfo=timezone) + when = dtm.datetime.now(timezone) job_queue.run_once(self.job_run_once, when) sleep(0.001) assert self.result == 1 - def test_run_once_no_time_spec(self, job_queue): - # test that an appropiate exception is raised if a job is attempted to be scheduled - # without specifying a time - with pytest.raises(ValueError): - job_queue.run_once(self.job_run_once, when=None) - def test_job_with_context(self, job_queue): job_queue.run_once(self.job_run_once_with_context, 0.01, context=5) sleep(0.02) @@ -117,18 +121,43 @@ class TestJobQueue: sleep(0.07) assert self.result == 1 - def test_run_repeating_first_immediate(self, job_queue): - job_queue.run_repeating(self.job_run_once, 0.1, first=0) - sleep(0.05) - assert self.result == 1 - def test_run_repeating_first_timezone(self, job_queue, timezone): """Test correct scheduling of job when passing a timezone-aware datetime as ``first``""" - first = (dtm.datetime.utcnow() + timezone.utcoffset(None)).replace(tzinfo=timezone) - job_queue.run_repeating(self.job_run_once, 0.05, first=first) - sleep(0.001) + job_queue.run_repeating(self.job_run_once, 0.1, + first=dtm.datetime.now(timezone) + dtm.timedelta(seconds=0.05)) + sleep(0.1) assert self.result == 1 + def test_run_repeating_last(self, job_queue): + job_queue.run_repeating(self.job_run_once, 0.05, last=0.06) + sleep(0.1) + assert self.result == 1 + sleep(0.1) + assert self.result == 1 + + def test_run_repeating_last_timezone(self, job_queue, timezone): + """Test correct scheduling of job when passing a timezone-aware datetime as ``first``""" + job_queue.run_repeating(self.job_run_once, 0.05, + last=dtm.datetime.now(timezone) + dtm.timedelta(seconds=0.06)) + sleep(0.1) + assert self.result == 1 + sleep(0.1) + assert self.result == 1 + + def test_run_repeating_last_before_first(self, job_queue): + with pytest.raises(ValueError, match="'last' must not be before 'first'!"): + job_queue.run_repeating(self.job_run_once, 0.05, first=1, last=0.5) + + def test_run_repeating_timedelta(self, job_queue): + job_queue.run_repeating(self.job_run_once, dtm.timedelta(minutes=3.3333e-4)) + sleep(0.05) + assert self.result == 2 + + def test_run_custom(self, job_queue): + job_queue.run_custom(self.job_run_once, {'trigger': 'interval', 'seconds': 0.02}) + sleep(0.05) + assert self.result == 2 + def test_multiple(self, job_queue): job_queue.run_once(self.job_run_once, 0.01) job_queue.run_once(self.job_run_once, 0.02) @@ -198,7 +227,10 @@ class TestJobQueue: sleep(1) assert self.result == 1 finally: - u.stop() + try: + u.stop() + except SchedulerNotRunningError: + pass def test_time_unit_int(self, job_queue): # Testing seconds in int @@ -221,9 +253,9 @@ class TestJobQueue: def test_time_unit_dt_datetime(self, job_queue): # Testing running at a specific datetime - delta, now = dtm.timedelta(seconds=0.05), time.time() - when = dtm.datetime.utcfromtimestamp(now) + delta - expected_time = now + delta.total_seconds() + delta, now = dtm.timedelta(seconds=0.05), dtm.datetime.now(pytz.utc) + when = now + delta + expected_time = (now + delta).timestamp() job_queue.run_once(self.job_datetime_tests, when) sleep(0.06) @@ -231,9 +263,10 @@ class TestJobQueue: def test_time_unit_dt_time_today(self, job_queue): # Testing running at a specific time today - delta, now = 0.05, time.time() - when = (dtm.datetime.utcfromtimestamp(now) + dtm.timedelta(seconds=delta)).time() - expected_time = now + delta + delta, now = 0.05, dtm.datetime.now(pytz.utc) + expected_time = now + dtm.timedelta(seconds=delta) + when = expected_time.time() + expected_time = expected_time.timestamp() job_queue.run_once(self.job_datetime_tests, when) sleep(0.06) @@ -242,262 +275,197 @@ class TestJobQueue: def test_time_unit_dt_time_tomorrow(self, job_queue): # Testing running at a specific time that has passed today. Since we can't wait a day, we # test if the job's next scheduled execution time has been calculated correctly - delta, now = -2, time.time() - when = (dtm.datetime.utcfromtimestamp(now) + dtm.timedelta(seconds=delta)).time() - expected_time = now + delta + 60 * 60 * 24 + delta, now = -2, dtm.datetime.now(pytz.utc) + when = (now + dtm.timedelta(seconds=delta)).time() + expected_time = (now + dtm.timedelta(seconds=delta, days=1)).timestamp() job_queue.run_once(self.job_datetime_tests, when) - assert job_queue._queue.get(False)[0] == pytest.approx(expected_time) + scheduled_time = job_queue.jobs()[0].next_t.timestamp() + assert scheduled_time == pytest.approx(expected_time) def test_run_daily(self, job_queue): - delta, now = 0.1, time.time() - time_of_day = (dtm.datetime.utcfromtimestamp(now) + dtm.timedelta(seconds=delta)).time() - expected_reschedule_time = now + delta + 24 * 60 * 60 + delta, now = 1, dtm.datetime.now(pytz.utc) + time_of_day = (now + dtm.timedelta(seconds=delta)).time() + expected_reschedule_time = (now + dtm.timedelta(seconds=delta, days=1)).timestamp() job_queue.run_daily(self.job_run_once, time_of_day) - sleep(0.2) - assert self.result == 1 - assert job_queue._queue.get(False)[0] == pytest.approx(expected_reschedule_time) - - def test_run_daily_with_timezone(self, job_queue): - """test that the weekday is retrieved based on the job's timezone - We set a job to run at the current UTC time of day (plus a small delay buffer) with a - timezone that is---approximately (see below)---UTC +24, and set it to run on the weekday - after the current UTC weekday. The job should therefore be executed now (because in UTC+24, - the time of day is the same as the current weekday is the one after the current UTC - weekday). - """ - now = time.time() - utcnow = dtm.datetime.utcfromtimestamp(now) - delta = 0.1 - - # must subtract one minute because the UTC offset has to be strictly less than 24h - # thus this test will xpass if run in the interval [00:00, 00:01) UTC time - # (because target time will be 23:59 UTC, so local and target weekday will be the same) - target_tzinfo = dtm.timezone(dtm.timedelta(days=1, minutes=-1)) - target_datetime = (utcnow + dtm.timedelta(days=1, minutes=-1, seconds=delta)).replace( - tzinfo=target_tzinfo) - target_time = target_datetime.timetz() - target_weekday = target_datetime.date().weekday() - expected_reschedule_time = now + delta + 24 * 60 * 60 - - job_queue.run_daily(self.job_run_once, time=target_time, days=(target_weekday,)) sleep(delta + 0.1) assert self.result == 1 - assert job_queue._queue.get(False)[0] == pytest.approx(expected_reschedule_time) + scheduled_time = job_queue.jobs()[0].next_t.timestamp() + assert scheduled_time == pytest.approx(expected_reschedule_time) - def test_run_monthly(self, job_queue): - delta, now = 0.1, time.time() - date_time = dtm.datetime.utcfromtimestamp(now) - time_of_day = (date_time + dtm.timedelta(seconds=delta)).time() - expected_reschedule_time = now + delta + def test_run_monthly(self, job_queue, timezone): + delta, now = 1, dtm.datetime.now(timezone) + expected_reschedule_time = now + dtm.timedelta(seconds=delta) + time_of_day = expected_reschedule_time.time().replace(tzinfo=timezone) - day = date_time.day - expected_reschedule_time += calendar.monthrange(date_time.year, - date_time.month)[1] * 24 * 60 * 60 + day = now.day + expected_reschedule_time += dtm.timedelta(calendar.monthrange(now.year, now.month)[1]) + expected_reschedule_time = expected_reschedule_time.timestamp() job_queue.run_monthly(self.job_run_once, time_of_day, day) - sleep(0.2) - assert self.result == 1 - assert job_queue._queue.get(False)[0] == pytest.approx(expected_reschedule_time) - - def test_run_monthly_and_not_strict(self, job_queue): - # This only really tests something in months with < 31 days. - # But the trouble of patching datetime is probably not worth it - - delta, now = 0.1, time.time() - date_time = dtm.datetime.utcfromtimestamp(now) - time_of_day = (date_time + dtm.timedelta(seconds=delta)).time() - expected_reschedule_time = now + delta - - day = date_time.day - date_time += dtm.timedelta(calendar.monthrange(date_time.year, - date_time.month)[1] - day) - # next job should be scheduled on last day of month if day_is_strict is False - expected_reschedule_time += (calendar.monthrange(date_time.year, - date_time.month)[1] - day) * 24 * 60 * 60 - - job_queue.run_monthly(self.job_run_once, time_of_day, 31, day_is_strict=False) - assert job_queue._queue.get(False)[0] == pytest.approx(expected_reschedule_time) - - def test_run_monthly_with_timezone(self, job_queue): - """test that the day is retrieved based on the job's timezone - We set a job to run at the current UTC time of day (plus a small delay buffer) with a - timezone that is---approximately (see below)---UTC +24, and set it to run on the weekday - after the current UTC weekday. The job should therefore be executed now (because in UTC+24, - the time of day is the same as the current weekday is the one after the current UTC - weekday). - """ - now = time.time() - utcnow = dtm.datetime.utcfromtimestamp(now) - delta = 0.1 - - # must subtract one minute because the UTC offset has to be strictly less than 24h - # thus this test will xpass if run in the interval [00:00, 00:01) UTC time - # (because target time will be 23:59 UTC, so local and target weekday will be the same) - target_tzinfo = dtm.timezone(dtm.timedelta(days=1, minutes=-1)) - target_datetime = (utcnow + dtm.timedelta(days=1, minutes=-1, seconds=delta)).replace( - tzinfo=target_tzinfo) - target_time = target_datetime.timetz() - target_day = target_datetime.day - expected_reschedule_time = now + delta - expected_reschedule_time += calendar.monthrange(target_datetime.year, - target_datetime.month)[1] * 24 * 60 * 60 - - job_queue.run_monthly(self.job_run_once, target_time, target_day) sleep(delta + 0.1) assert self.result == 1 - assert job_queue._queue.get(False)[0] == pytest.approx(expected_reschedule_time) + scheduled_time = job_queue.jobs()[0].next_t.timestamp() + assert scheduled_time == pytest.approx(expected_reschedule_time) - def test_warnings(self, job_queue): - j = Job(self.job_run_once, repeat=False) - with pytest.raises(ValueError, match='can not be set to'): - j.repeat = True - j.interval = 15 - assert j.interval_seconds == 15 - j.repeat = True - with pytest.raises(ValueError, match='can not be'): - j.interval = None - j.repeat = False - with pytest.raises(TypeError, match='must be of type'): - j.interval = 'every 3 minutes' - j.interval = 15 - assert j.interval_seconds == 15 + def test_run_monthly_non_strict_day(self, job_queue, timezone): + delta, now = 1, dtm.datetime.now(timezone) + expected_reschedule_time = now + dtm.timedelta(seconds=delta) + time_of_day = expected_reschedule_time.time().replace(tzinfo=timezone) - with pytest.raises(TypeError, match='argument should be of type'): - j.days = 'every day' - with pytest.raises(TypeError, match='The elements of the'): - j.days = ('mon', 'wed') - with pytest.raises(ValueError, match='from 0 up to and'): - j.days = (0, 6, 12, 14) + expected_reschedule_time += (dtm.timedelta(calendar.monthrange(now.year, now.month)[1]) + - dtm.timedelta(days=now.day)) + # Adjust the hour for the special case that between now & end of month a DST switch happens + expected_reschedule_time = timezone.normalize(expected_reschedule_time) + expected_reschedule_time += dtm.timedelta( + hours=time_of_day.hour - expected_reschedule_time.hour) + expected_reschedule_time = expected_reschedule_time.timestamp() - with pytest.raises(TypeError, match='argument should be one of the'): - j._set_next_t('tomorrow') + job_queue.run_monthly(self.job_run_once, time_of_day, 31, day_is_strict=False) + scheduled_time = job_queue.jobs()[0].next_t.timestamp() + assert scheduled_time == pytest.approx(expected_reschedule_time) - def test_get_jobs(self, job_queue): - job1 = job_queue.run_once(self.job_run_once, 10, name='name1') - job2 = job_queue.run_once(self.job_run_once, 10, name='name1') - job3 = job_queue.run_once(self.job_run_once, 10, name='name2') + @pytest.mark.parametrize('use_context', [True, False]) + def test_get_jobs(self, job_queue, use_context): + job_queue._dispatcher.use_context = use_context + if use_context: + callback = self.job_context_based_callback + else: + callback = self.job_run_once + + job1 = job_queue.run_once(callback, 10, name='name1') + job2 = job_queue.run_once(callback, 10, name='name1') + job3 = job_queue.run_once(callback, 10, name='name2') assert job_queue.jobs() == (job1, job2, job3) assert job_queue.get_jobs_by_name('name1') == (job1, job2) assert job_queue.get_jobs_by_name('name2') == (job3,) - def test_bot_in_init_deprecation(self, bot): - with pytest.warns(TelegramDeprecationWarning): - JobQueue(bot) - def test_context_based_callback(self, job_queue): - job_queue.run_once(self.job_context_based_callback, 0.01, context=2) + job_queue._dispatcher.use_context = True + job_queue.run_once(self.job_context_based_callback, 0.01, context=2) sleep(0.03) - assert self.result == 0 - - def test_job_default_tzinfo(self, job_queue): - """Test that default tzinfo is always set to UTC""" - job_1 = job_queue.run_once(self.job_run_once, 0.01) - job_2 = job_queue.run_repeating(self.job_run_once, 10) - job_3 = job_queue.run_daily(self.job_run_once, time=dtm.time(hour=15)) - - jobs = [job_1, job_2, job_3] - - for job in jobs: - assert job.tzinfo == dtm.timezone.utc - - def test_job_next_t_property(self, job_queue): - # Testing: - # - next_t values match values from self._queue.queue (for run_once and run_repeating jobs) - # - next_t equals None if job is removed or if it's already ran - - job1 = job_queue.run_once(self.job_run_once, 0.06, name='run_once job') - job2 = job_queue.run_once(self.job_run_once, 0.06, name='canceled run_once job') - job_queue.run_repeating(self.job_run_once, 0.04, name='repeatable job') - - sleep(0.05) - job2.schedule_removal() - - with job_queue._queue.mutex: - for t, job in job_queue._queue.queue: - t = dtm.datetime.fromtimestamp(t, job.tzinfo) - - if job.removed: - assert job.next_t is None - else: - assert job.next_t == t - assert self.result == 1 - sleep(0.02) + job_queue._dispatcher.use_context = False + @pytest.mark.parametrize('use_context', [True, False]) + def test_job_run(self, _dp, use_context): + _dp.use_context = use_context + job_queue = JobQueue() + job_queue.set_dispatcher(_dp) + if use_context: + job = job_queue.run_repeating(self.job_context_based_callback, 0.02, context=2) + else: + job = job_queue.run_repeating(self.job_run_once, 0.02, context=2) + assert self.result == 0 + job.run(_dp) + assert self.result == 1 + + def test_enable_disable_job(self, job_queue): + job = job_queue.run_repeating(self.job_run_once, 0.02) + sleep(0.05) assert self.result == 2 - assert job1.next_t is None - assert job2.next_t is None + job.enabled = False + assert not job.enabled + sleep(0.05) + assert self.result == 2 + job.enabled = True + assert job.enabled + sleep(0.05) + assert self.result == 4 - def test_job_set_next_t(self, job_queue): - # Testing next_t setter for 'datetime.datetime' values + def test_remove_job(self, job_queue): + job = job_queue.run_repeating(self.job_run_once, 0.02) + sleep(0.05) + assert self.result == 2 + assert not job.removed + job.schedule_removal() + assert job.removed + sleep(0.05) + assert self.result == 2 - job = job_queue.run_once(self.job_run_once, 0.05) + def test_job_lt_eq(self, job_queue): + job = job_queue.run_repeating(self.job_run_once, 0.02) + assert not job == job_queue + assert not job < job - t = dtm.datetime.now(tz=dtm.timezone(dtm.timedelta(hours=12))) - job._set_next_t(t) - job.tzinfo = dtm.timezone(dtm.timedelta(hours=5)) - assert job.next_t == t.astimezone(job.tzinfo) + def test_dispatch_error(self, job_queue, dp): + dp.add_error_handler(self.error_handler) - def test_passing_tzinfo_to_job(self, job_queue): - """Test that tzinfo is correctly passed to job with run_once, run_daily, run_repeating - and run_monthly methods""" + job = job_queue.run_once(self.job_with_exception, 0.05) + sleep(.1) + assert self.received_error == 'Test Error' + self.received_error = None + job.run(dp) + assert self.received_error == 'Test Error' - when_dt_tz_specific = dtm.datetime.now( - tz=dtm.timezone(dtm.timedelta(hours=12)) - ) + dtm.timedelta(seconds=2) - when_dt_tz_utc = dtm.datetime.now() + dtm.timedelta(seconds=2) - job_once1 = job_queue.run_once(self.job_run_once, when_dt_tz_specific) - job_once2 = job_queue.run_once(self.job_run_once, when_dt_tz_utc) + # Remove handler + dp.remove_error_handler(self.error_handler) + self.received_error = None - when_time_tz_specific = (dtm.datetime.now( - tz=dtm.timezone(dtm.timedelta(hours=12)) - ) + dtm.timedelta(seconds=2)).timetz() - when_time_tz_utc = (dtm.datetime.now() + dtm.timedelta(seconds=2)).timetz() - job_once3 = job_queue.run_once(self.job_run_once, when_time_tz_specific) - job_once4 = job_queue.run_once(self.job_run_once, when_time_tz_utc) + job = job_queue.run_once(self.job_with_exception, 0.05) + sleep(.1) + assert self.received_error is None + job.run(dp) + assert self.received_error is None - first_dt_tz_specific = dtm.datetime.now( - tz=dtm.timezone(dtm.timedelta(hours=12)) - ) + dtm.timedelta(seconds=2) - first_dt_tz_utc = dtm.datetime.now() + dtm.timedelta(seconds=2) - job_repeating1 = job_queue.run_repeating( - self.job_run_once, 2, first=first_dt_tz_specific) - job_repeating2 = job_queue.run_repeating( - self.job_run_once, 2, first=first_dt_tz_utc) + def test_dispatch_error_context(self, job_queue, cdp): + cdp.add_error_handler(self.error_handler_context) - first_time_tz_specific = (dtm.datetime.now( - tz=dtm.timezone(dtm.timedelta(hours=12)) - ) + dtm.timedelta(seconds=2)).timetz() - first_time_tz_utc = (dtm.datetime.now() + dtm.timedelta(seconds=2)).timetz() - job_repeating3 = job_queue.run_repeating( - self.job_run_once, 2, first=first_time_tz_specific) - job_repeating4 = job_queue.run_repeating( - self.job_run_once, 2, first=first_time_tz_utc) + job = job_queue.run_once(self.job_with_exception, 0.05) + sleep(.1) + assert self.received_error == 'Test Error' + self.received_error = None + job.run(cdp) + assert self.received_error == 'Test Error' - time_tz_specific = (dtm.datetime.now( - tz=dtm.timezone(dtm.timedelta(hours=12)) - ) + dtm.timedelta(seconds=2)).timetz() - time_tz_utc = (dtm.datetime.now() + dtm.timedelta(seconds=2)).timetz() - job_daily1 = job_queue.run_daily(self.job_run_once, time_tz_specific) - job_daily2 = job_queue.run_daily(self.job_run_once, time_tz_utc) + # Remove handler + cdp.remove_error_handler(self.error_handler_context) + self.received_error = None - job_monthly1 = job_queue.run_monthly(self.job_run_once, time_tz_specific, 1) - job_monthly2 = job_queue.run_monthly(self.job_run_once, time_tz_utc, 1) + job = job_queue.run_once(self.job_with_exception, 0.05) + sleep(.1) + assert self.received_error is None + job.run(cdp) + assert self.received_error is None - assert job_once1.tzinfo == when_dt_tz_specific.tzinfo - assert job_once2.tzinfo == dtm.timezone.utc - assert job_once3.tzinfo == when_time_tz_specific.tzinfo - assert job_once4.tzinfo == dtm.timezone.utc - assert job_repeating1.tzinfo == first_dt_tz_specific.tzinfo - assert job_repeating2.tzinfo == dtm.timezone.utc - assert job_repeating3.tzinfo == first_time_tz_specific.tzinfo - assert job_repeating4.tzinfo == dtm.timezone.utc - assert job_daily1.tzinfo == time_tz_specific.tzinfo - assert job_daily2.tzinfo == dtm.timezone.utc - assert job_monthly1.tzinfo == time_tz_specific.tzinfo - assert job_monthly2.tzinfo == dtm.timezone.utc + def test_dispatch_error_that_raises_errors(self, job_queue, dp, caplog): + dp.add_error_handler(self.error_handler_raise_error) + + with caplog.at_level(logging.ERROR): + job = job_queue.run_once(self.job_with_exception, 0.05) + sleep(.1) + assert len(caplog.records) == 1 + rec = caplog.records[-1] + assert 'processing the job' in rec.msg + assert 'uncaught error was raised while handling' in rec.msg + caplog.clear() + + with caplog.at_level(logging.ERROR): + job.run(dp) + assert len(caplog.records) == 1 + rec = caplog.records[-1] + assert 'processing the job' in rec.msg + assert 'uncaught error was raised while handling' in rec.msg + caplog.clear() + + # Remove handler + dp.remove_error_handler(self.error_handler_raise_error) + self.received_error = None + + with caplog.at_level(logging.ERROR): + job = job_queue.run_once(self.job_with_exception, 0.05) + sleep(.1) + assert len(caplog.records) == 1 + rec = caplog.records[-1] + assert 'No error handlers are registered' in rec.msg + caplog.clear() + + with caplog.at_level(logging.ERROR): + job.run(dp) + assert len(caplog.records) == 1 + rec = caplog.records[-1] + assert 'No error handlers are registered' in rec.msg diff --git a/tests/test_persistence.py b/tests/test_persistence.py index 374122e57..9cc4d74e9 100644 --- a/tests/test_persistence.py +++ b/tests/test_persistence.py @@ -17,7 +17,6 @@ # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. import signal -import sys from telegram.utils.helpers import encode_conversations_to_json @@ -1169,7 +1168,6 @@ class TestDictPersistence: assert dict_persistence.bot_data == bot_data assert dict_persistence.conversations == conversations - @pytest.mark.skipif(sys.version_info < (3, 6), reason="dicts are not ordered in py<=3.5") def test_json_outputs(self, user_data_json, chat_data_json, bot_data_json, conversations_json): dict_persistence = DictPersistence(user_data_json=user_data_json, chat_data_json=chat_data_json, @@ -1180,7 +1178,6 @@ class TestDictPersistence: assert dict_persistence.bot_data_json == bot_data_json assert dict_persistence.conversations_json == conversations_json - @pytest.mark.skipif(sys.version_info < (3, 6), reason="dicts are not ordered in py<=3.5") def test_json_changes(self, user_data, user_data_json, chat_data, chat_data_json, bot_data, bot_data_json, conversations, conversations_json): diff --git a/tests/test_updater.py b/tests/test_updater.py index 832d88be2..81f2a5498 100644 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -513,10 +513,14 @@ class TestUpdater: with caplog.at_level(logging.INFO): updater.idle() - rec = caplog.records[-1] + rec = caplog.records[-2] assert rec.msg.startswith('Received signal {}'.format(signal.SIGTERM)) assert rec.levelname == 'INFO' + rec = caplog.records[-1] + assert rec.msg.startswith('Scheduler has been shut down') + assert rec.levelname == 'INFO' + # If we get this far, idle() ran through sleep(.5) assert updater.running is False