python-telegram-bot/tests/test_jobqueue.py

522 lines
18 KiB
Python
Raw Normal View History

#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
2022-01-03 08:15:18 +01:00
# Copyright (C) 2015-2022
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
import calendar
import datetime as dtm
import logging
import os
import platform
Job queue time units (#452) * Adding timeunit and day support to the jobqueue * Adding tests * Changed the file permission back to 644. * Changed AssertEqual argument order to (actual, expectd). * Removed the TimeUnit enum and unit param, instead use datetime.time for interval. * Removing the TimeUnits enum and unit param in favour of optionally using a datetime.time as the interval. * Removing the TimeUnits enumeration, forgot the remove it in the last one. * Removed some old docstrings refering to the TimeUnits enum. * Removed the old TimeUnits import. * Adding some error handling for the 'days' argument (only a 'tuple' with 'Days') * Writing the error message directly in the exception. * Moving a debug statement wrongfully saying a job would be running on days it wouldn't. * Writing error messages directly in the exceptions instead of making an extra variable. * Replacing datetime.time in favour of datetime.timedelta because of the get_seconds() method. * Adding error handling for the method . * Splitting the tests up in multiple ones, no float test because I haven't found a reliable way to test it. * Excluding .exrc file. * Removing \ at EOF of ValueError. * Replacing Enums with plain new-style classes. * Using numbers.number to check for ints/floats instead of seperate int/float checks. * Fixing typo, number -> Number. * Changed lower_case Days attributes to UPPER_CASE. * Different formatting for Days class, removed the get_days function in favour of a tuple. * Removed redundant function get_days. * Edited the docstring for next_t to also take datetime.timedelta. * Removed for-loop in favour of any(). * Changed docstring for interval. * Removed debug print. * Changing some docstrings. * Changing some docstrings (again).
2016-11-08 23:39:25 +01:00
import time
from queue import Queue
from time import sleep
import pytest
import pytz
from apscheduler.schedulers import SchedulerNotRunningError
from flaky import flaky
from telegram.ext import (
JobQueue,
Job,
CallbackContext,
ContextTypes,
DispatcherBuilder,
)
class CustomContext(CallbackContext):
pass
@pytest.fixture(scope='function')
def job_queue(bot, _dp):
jq = JobQueue()
jq.set_dispatcher(_dp)
jq.start()
yield jq
jq.stop()
@pytest.mark.skipif(
os.getenv('GITHUB_ACTIONS', False) and platform.system() in ['Windows', 'Darwin'],
reason="On Windows & MacOS precise timings are not accurate.",
)
@flaky(10, 1) # Timings aren't quite perfect
class TestJobQueue:
result = 0
job_time = 0
received_error = None
@pytest.fixture(autouse=True)
def reset(self):
self.result = 0
Job queue time units (#452) * Adding timeunit and day support to the jobqueue * Adding tests * Changed the file permission back to 644. * Changed AssertEqual argument order to (actual, expectd). * Removed the TimeUnit enum and unit param, instead use datetime.time for interval. * Removing the TimeUnits enum and unit param in favour of optionally using a datetime.time as the interval. * Removing the TimeUnits enumeration, forgot the remove it in the last one. * Removed some old docstrings refering to the TimeUnits enum. * Removed the old TimeUnits import. * Adding some error handling for the 'days' argument (only a 'tuple' with 'Days') * Writing the error message directly in the exception. * Moving a debug statement wrongfully saying a job would be running on days it wouldn't. * Writing error messages directly in the exceptions instead of making an extra variable. * Replacing datetime.time in favour of datetime.timedelta because of the get_seconds() method. * Adding error handling for the method . * Splitting the tests up in multiple ones, no float test because I haven't found a reliable way to test it. * Excluding .exrc file. * Removing \ at EOF of ValueError. * Replacing Enums with plain new-style classes. * Using numbers.number to check for ints/floats instead of seperate int/float checks. * Fixing typo, number -> Number. * Changed lower_case Days attributes to UPPER_CASE. * Different formatting for Days class, removed the get_days function in favour of a tuple. * Removed redundant function get_days. * Edited the docstring for next_t to also take datetime.timedelta. * Removed for-loop in favour of any(). * Changed docstring for interval. * Removed debug print. * Changing some docstrings. * Changing some docstrings (again).
2016-11-08 23:39:25 +01:00
self.job_time = 0
self.received_error = None
2021-08-29 18:15:04 +02:00
def job_run_once(self, context):
self.result += 1
2021-08-29 18:15:04 +02:00
def job_with_exception(self, context):
raise Exception('Test Error')
2016-01-04 02:36:15 +01:00
2021-08-29 18:15:04 +02:00
def job_remove_self(self, context):
self.result += 1
2021-08-29 18:15:04 +02:00
context.job.schedule_removal()
2021-08-29 18:15:04 +02:00
def job_run_once_with_context(self, context):
self.result += context.job.context
2016-05-26 14:07:44 +02:00
2021-08-29 18:15:04 +02:00
def job_datetime_tests(self, context):
self.job_time = time.time()
Job queue time units (#452) * Adding timeunit and day support to the jobqueue * Adding tests * Changed the file permission back to 644. * Changed AssertEqual argument order to (actual, expectd). * Removed the TimeUnit enum and unit param, instead use datetime.time for interval. * Removing the TimeUnits enum and unit param in favour of optionally using a datetime.time as the interval. * Removing the TimeUnits enumeration, forgot the remove it in the last one. * Removed some old docstrings refering to the TimeUnits enum. * Removed the old TimeUnits import. * Adding some error handling for the 'days' argument (only a 'tuple' with 'Days') * Writing the error message directly in the exception. * Moving a debug statement wrongfully saying a job would be running on days it wouldn't. * Writing error messages directly in the exceptions instead of making an extra variable. * Replacing datetime.time in favour of datetime.timedelta because of the get_seconds() method. * Adding error handling for the method . * Splitting the tests up in multiple ones, no float test because I haven't found a reliable way to test it. * Excluding .exrc file. * Removing \ at EOF of ValueError. * Replacing Enums with plain new-style classes. * Using numbers.number to check for ints/floats instead of seperate int/float checks. * Fixing typo, number -> Number. * Changed lower_case Days attributes to UPPER_CASE. * Different formatting for Days class, removed the get_days function in favour of a tuple. * Removed redundant function get_days. * Edited the docstring for next_t to also take datetime.timedelta. * Removed for-loop in favour of any(). * Changed docstring for interval. * Removed debug print. * Changing some docstrings. * Changing some docstrings (again).
2016-11-08 23:39:25 +01:00
def job_context_based_callback(self, context):
if (
isinstance(context, CallbackContext)
and isinstance(context.job, Job)
and isinstance(context.update_queue, Queue)
and context.job.context == 2
and context.chat_data is None
and context.user_data is None
and isinstance(context.bot_data, dict)
):
self.result += 1
def error_handler_context(self, update, context):
self.received_error = (str(context.error), context.job)
def error_handler_raise_error(self, *args):
raise Exception('Failing bigly')
def test_slot_behaviour(self, job_queue, mro_slots, _dp):
for attr in job_queue.__slots__:
assert getattr(job_queue, attr, 'err') != 'err', f"got extra slot '{attr}'"
assert len(mro_slots(job_queue)) == len(set(mro_slots(job_queue))), "duplicate slot"
def test_dispatcher_weakref(self, bot):
jq = JobQueue()
dispatcher = DispatcherBuilder().bot(bot).job_queue(None).build()
with pytest.raises(RuntimeError, match='No dispatcher was set'):
jq.dispatcher
jq.set_dispatcher(dispatcher)
assert jq.dispatcher is dispatcher
del dispatcher
with pytest.raises(RuntimeError, match='no longer alive'):
jq.dispatcher
def test_run_once(self, job_queue):
job_queue.run_once(self.job_run_once, 0.01)
sleep(0.02)
assert self.result == 1
def test_run_once_timezone(self, job_queue, timezone):
"""Test the correct handling of aware datetimes"""
# we're parametrizing this with two different UTC offsets to exclude the possibility
# of an xpass when the test is run in a timezone with the same UTC offset
when = dtm.datetime.now(timezone)
job_queue.run_once(self.job_run_once, when)
sleep(0.001)
assert self.result == 1
def test_job_with_context(self, job_queue):
job_queue.run_once(self.job_run_once_with_context, 0.01, context=5)
sleep(0.02)
assert self.result == 5
def test_run_repeating(self, job_queue):
job_queue.run_repeating(self.job_run_once, 0.02)
sleep(0.05)
assert self.result == 2
def test_run_repeating_first(self, job_queue):
job_queue.run_repeating(self.job_run_once, 0.05, first=0.2)
sleep(0.15)
assert self.result == 0
sleep(0.07)
assert self.result == 1
2020-10-06 19:28:40 +02:00
def test_run_repeating_first_timezone(self, job_queue, timezone):
"""Test correct scheduling of job when passing a timezone-aware datetime as ``first``"""
job_queue.run_repeating(
self.job_run_once, 0.1, first=dtm.datetime.now(timezone) + dtm.timedelta(seconds=0.05)
)
sleep(0.1)
assert self.result == 1
def test_run_repeating_last(self, job_queue):
job_queue.run_repeating(self.job_run_once, 0.05, last=0.06)
sleep(0.1)
assert self.result == 1
sleep(0.1)
assert self.result == 1
2020-10-06 19:28:40 +02:00
def test_run_repeating_last_timezone(self, job_queue, timezone):
"""Test correct scheduling of job when passing a timezone-aware datetime as ``first``"""
job_queue.run_repeating(
self.job_run_once, 0.05, last=dtm.datetime.now(timezone) + dtm.timedelta(seconds=0.06)
)
2020-10-06 19:28:40 +02:00
sleep(0.1)
assert self.result == 1
sleep(0.1)
assert self.result == 1
def test_run_repeating_last_before_first(self, job_queue):
with pytest.raises(ValueError, match="'last' must not be before 'first'!"):
job_queue.run_repeating(self.job_run_once, 0.05, first=1, last=0.5)
def test_run_repeating_timedelta(self, job_queue):
job_queue.run_repeating(self.job_run_once, dtm.timedelta(minutes=3.3333e-4))
sleep(0.05)
assert self.result == 2
def test_run_custom(self, job_queue):
job_queue.run_custom(self.job_run_once, {'trigger': 'interval', 'seconds': 0.02})
sleep(0.05)
assert self.result == 2
def test_multiple(self, job_queue):
job_queue.run_once(self.job_run_once, 0.01)
job_queue.run_once(self.job_run_once, 0.02)
job_queue.run_repeating(self.job_run_once, 0.02)
sleep(0.055)
assert self.result == 4
def test_disabled(self, job_queue):
j1 = job_queue.run_once(self.job_run_once, 0.1)
j2 = job_queue.run_repeating(self.job_run_once, 0.05)
j1.enabled = False
j2.enabled = False
sleep(0.06)
assert self.result == 0
j1.enabled = True
sleep(0.2)
assert self.result == 1
def test_schedule_removal(self, job_queue):
j1 = job_queue.run_once(self.job_run_once, 0.03)
j2 = job_queue.run_repeating(self.job_run_once, 0.02)
sleep(0.025)
j1.schedule_removal()
j2.schedule_removal()
sleep(0.04)
assert self.result == 1
def test_schedule_removal_from_within(self, job_queue):
job_queue.run_repeating(self.job_remove_self, 0.01)
sleep(0.05)
assert self.result == 1
def test_longer_first(self, job_queue):
job_queue.run_once(self.job_run_once, 0.02)
job_queue.run_once(self.job_run_once, 0.01)
sleep(0.015)
assert self.result == 1
2016-01-05 13:32:19 +01:00
def test_error(self, job_queue):
job_queue.run_repeating(self.job_with_exception, 0.01)
job_queue.run_repeating(self.job_run_once, 0.02)
sleep(0.03)
assert self.result == 1
def test_in_dispatcher(self, bot):
dispatcher = DispatcherBuilder().bot(bot).build()
dispatcher.job_queue.start()
try:
dispatcher.job_queue.run_repeating(self.job_run_once, 0.02)
sleep(0.03)
assert self.result == 1
dispatcher.stop()
sleep(1)
assert self.result == 1
finally:
try:
dispatcher.stop()
except SchedulerNotRunningError:
pass
2016-01-04 11:08:43 +01:00
def test_time_unit_int(self, job_queue):
Job queue time units (#452) * Adding timeunit and day support to the jobqueue * Adding tests * Changed the file permission back to 644. * Changed AssertEqual argument order to (actual, expectd). * Removed the TimeUnit enum and unit param, instead use datetime.time for interval. * Removing the TimeUnits enum and unit param in favour of optionally using a datetime.time as the interval. * Removing the TimeUnits enumeration, forgot the remove it in the last one. * Removed some old docstrings refering to the TimeUnits enum. * Removed the old TimeUnits import. * Adding some error handling for the 'days' argument (only a 'tuple' with 'Days') * Writing the error message directly in the exception. * Moving a debug statement wrongfully saying a job would be running on days it wouldn't. * Writing error messages directly in the exceptions instead of making an extra variable. * Replacing datetime.time in favour of datetime.timedelta because of the get_seconds() method. * Adding error handling for the method . * Splitting the tests up in multiple ones, no float test because I haven't found a reliable way to test it. * Excluding .exrc file. * Removing \ at EOF of ValueError. * Replacing Enums with plain new-style classes. * Using numbers.number to check for ints/floats instead of seperate int/float checks. * Fixing typo, number -> Number. * Changed lower_case Days attributes to UPPER_CASE. * Different formatting for Days class, removed the get_days function in favour of a tuple. * Removed redundant function get_days. * Edited the docstring for next_t to also take datetime.timedelta. * Removed for-loop in favour of any(). * Changed docstring for interval. * Removed debug print. * Changing some docstrings. * Changing some docstrings (again).
2016-11-08 23:39:25 +01:00
# Testing seconds in int
delta = 0.05
2016-12-14 16:27:45 +01:00
expected_time = time.time() + delta
Job queue time units (#452) * Adding timeunit and day support to the jobqueue * Adding tests * Changed the file permission back to 644. * Changed AssertEqual argument order to (actual, expectd). * Removed the TimeUnit enum and unit param, instead use datetime.time for interval. * Removing the TimeUnits enum and unit param in favour of optionally using a datetime.time as the interval. * Removing the TimeUnits enumeration, forgot the remove it in the last one. * Removed some old docstrings refering to the TimeUnits enum. * Removed the old TimeUnits import. * Adding some error handling for the 'days' argument (only a 'tuple' with 'Days') * Writing the error message directly in the exception. * Moving a debug statement wrongfully saying a job would be running on days it wouldn't. * Writing error messages directly in the exceptions instead of making an extra variable. * Replacing datetime.time in favour of datetime.timedelta because of the get_seconds() method. * Adding error handling for the method . * Splitting the tests up in multiple ones, no float test because I haven't found a reliable way to test it. * Excluding .exrc file. * Removing \ at EOF of ValueError. * Replacing Enums with plain new-style classes. * Using numbers.number to check for ints/floats instead of seperate int/float checks. * Fixing typo, number -> Number. * Changed lower_case Days attributes to UPPER_CASE. * Different formatting for Days class, removed the get_days function in favour of a tuple. * Removed redundant function get_days. * Edited the docstring for next_t to also take datetime.timedelta. * Removed for-loop in favour of any(). * Changed docstring for interval. * Removed debug print. * Changing some docstrings. * Changing some docstrings (again).
2016-11-08 23:39:25 +01:00
job_queue.run_once(self.job_datetime_tests, delta)
sleep(0.06)
assert pytest.approx(self.job_time) == expected_time
Job queue time units (#452) * Adding timeunit and day support to the jobqueue * Adding tests * Changed the file permission back to 644. * Changed AssertEqual argument order to (actual, expectd). * Removed the TimeUnit enum and unit param, instead use datetime.time for interval. * Removing the TimeUnits enum and unit param in favour of optionally using a datetime.time as the interval. * Removing the TimeUnits enumeration, forgot the remove it in the last one. * Removed some old docstrings refering to the TimeUnits enum. * Removed the old TimeUnits import. * Adding some error handling for the 'days' argument (only a 'tuple' with 'Days') * Writing the error message directly in the exception. * Moving a debug statement wrongfully saying a job would be running on days it wouldn't. * Writing error messages directly in the exceptions instead of making an extra variable. * Replacing datetime.time in favour of datetime.timedelta because of the get_seconds() method. * Adding error handling for the method . * Splitting the tests up in multiple ones, no float test because I haven't found a reliable way to test it. * Excluding .exrc file. * Removing \ at EOF of ValueError. * Replacing Enums with plain new-style classes. * Using numbers.number to check for ints/floats instead of seperate int/float checks. * Fixing typo, number -> Number. * Changed lower_case Days attributes to UPPER_CASE. * Different formatting for Days class, removed the get_days function in favour of a tuple. * Removed redundant function get_days. * Edited the docstring for next_t to also take datetime.timedelta. * Removed for-loop in favour of any(). * Changed docstring for interval. * Removed debug print. * Changing some docstrings. * Changing some docstrings (again).
2016-11-08 23:39:25 +01:00
def test_time_unit_dt_timedelta(self, job_queue):
Job queue time units (#452) * Adding timeunit and day support to the jobqueue * Adding tests * Changed the file permission back to 644. * Changed AssertEqual argument order to (actual, expectd). * Removed the TimeUnit enum and unit param, instead use datetime.time for interval. * Removing the TimeUnits enum and unit param in favour of optionally using a datetime.time as the interval. * Removing the TimeUnits enumeration, forgot the remove it in the last one. * Removed some old docstrings refering to the TimeUnits enum. * Removed the old TimeUnits import. * Adding some error handling for the 'days' argument (only a 'tuple' with 'Days') * Writing the error message directly in the exception. * Moving a debug statement wrongfully saying a job would be running on days it wouldn't. * Writing error messages directly in the exceptions instead of making an extra variable. * Replacing datetime.time in favour of datetime.timedelta because of the get_seconds() method. * Adding error handling for the method . * Splitting the tests up in multiple ones, no float test because I haven't found a reliable way to test it. * Excluding .exrc file. * Removing \ at EOF of ValueError. * Replacing Enums with plain new-style classes. * Using numbers.number to check for ints/floats instead of seperate int/float checks. * Fixing typo, number -> Number. * Changed lower_case Days attributes to UPPER_CASE. * Different formatting for Days class, removed the get_days function in favour of a tuple. * Removed redundant function get_days. * Edited the docstring for next_t to also take datetime.timedelta. * Removed for-loop in favour of any(). * Changed docstring for interval. * Removed debug print. * Changing some docstrings. * Changing some docstrings (again).
2016-11-08 23:39:25 +01:00
# Testing seconds, minutes and hours as datetime.timedelta object
# This is sufficient to test that it actually works.
interval = dtm.timedelta(seconds=0.05)
expected_time = time.time() + interval.total_seconds()
Job queue time units (#452) * Adding timeunit and day support to the jobqueue * Adding tests * Changed the file permission back to 644. * Changed AssertEqual argument order to (actual, expectd). * Removed the TimeUnit enum and unit param, instead use datetime.time for interval. * Removing the TimeUnits enum and unit param in favour of optionally using a datetime.time as the interval. * Removing the TimeUnits enumeration, forgot the remove it in the last one. * Removed some old docstrings refering to the TimeUnits enum. * Removed the old TimeUnits import. * Adding some error handling for the 'days' argument (only a 'tuple' with 'Days') * Writing the error message directly in the exception. * Moving a debug statement wrongfully saying a job would be running on days it wouldn't. * Writing error messages directly in the exceptions instead of making an extra variable. * Replacing datetime.time in favour of datetime.timedelta because of the get_seconds() method. * Adding error handling for the method . * Splitting the tests up in multiple ones, no float test because I haven't found a reliable way to test it. * Excluding .exrc file. * Removing \ at EOF of ValueError. * Replacing Enums with plain new-style classes. * Using numbers.number to check for ints/floats instead of seperate int/float checks. * Fixing typo, number -> Number. * Changed lower_case Days attributes to UPPER_CASE. * Different formatting for Days class, removed the get_days function in favour of a tuple. * Removed redundant function get_days. * Edited the docstring for next_t to also take datetime.timedelta. * Removed for-loop in favour of any(). * Changed docstring for interval. * Removed debug print. * Changing some docstrings. * Changing some docstrings (again).
2016-11-08 23:39:25 +01:00
job_queue.run_once(self.job_datetime_tests, interval)
sleep(0.06)
assert pytest.approx(self.job_time) == expected_time
def test_time_unit_dt_datetime(self, job_queue):
# Testing running at a specific datetime
delta, now = dtm.timedelta(seconds=0.05), dtm.datetime.now(pytz.utc)
when = now + delta
expected_time = (now + delta).timestamp()
job_queue.run_once(self.job_datetime_tests, when)
sleep(0.06)
assert self.job_time == pytest.approx(expected_time)
def test_time_unit_dt_time_today(self, job_queue):
# Testing running at a specific time today
delta, now = 0.05, dtm.datetime.now(pytz.utc)
expected_time = now + dtm.timedelta(seconds=delta)
when = expected_time.time()
expected_time = expected_time.timestamp()
job_queue.run_once(self.job_datetime_tests, when)
sleep(0.06)
assert self.job_time == pytest.approx(expected_time)
def test_time_unit_dt_time_tomorrow(self, job_queue):
# Testing running at a specific time that has passed today. Since we can't wait a day, we
# test if the job's next scheduled execution time has been calculated correctly
delta, now = -2, dtm.datetime.now(pytz.utc)
when = (now + dtm.timedelta(seconds=delta)).time()
expected_time = (now + dtm.timedelta(seconds=delta, days=1)).timestamp()
job_queue.run_once(self.job_datetime_tests, when)
scheduled_time = job_queue.jobs()[0].next_t.timestamp()
assert scheduled_time == pytest.approx(expected_time)
2016-12-14 16:27:45 +01:00
def test_run_daily(self, job_queue):
delta, now = 1, dtm.datetime.now(pytz.utc)
time_of_day = (now + dtm.timedelta(seconds=delta)).time()
expected_reschedule_time = (now + dtm.timedelta(seconds=delta, days=1)).timestamp()
2016-12-14 16:27:45 +01:00
job_queue.run_daily(self.job_run_once, time_of_day)
sleep(delta + 0.1)
assert self.result == 1
scheduled_time = job_queue.jobs()[0].next_t.timestamp()
assert scheduled_time == pytest.approx(expected_reschedule_time)
def test_run_monthly(self, job_queue, timezone):
delta, now = 1, dtm.datetime.now(timezone)
expected_reschedule_time = now + dtm.timedelta(seconds=delta)
time_of_day = expected_reschedule_time.time().replace(tzinfo=timezone)
day = now.day
this_months_days = calendar.monthrange(now.year, now.month)[1]
if now.month == 12:
next_months_days = calendar.monthrange(now.year + 1, 1)[1]
else:
next_months_days = calendar.monthrange(now.year, now.month + 1)[1]
expected_reschedule_time += dtm.timedelta(this_months_days)
if day > next_months_days:
expected_reschedule_time += dtm.timedelta(next_months_days)
expected_reschedule_time = timezone.normalize(expected_reschedule_time)
2020-09-27 12:59:48 +02:00
# Adjust the hour for the special case that between now and next month a DST switch happens
expected_reschedule_time += dtm.timedelta(
hours=time_of_day.hour - expected_reschedule_time.hour
)
expected_reschedule_time = expected_reschedule_time.timestamp()
job_queue.run_monthly(self.job_run_once, time_of_day, day)
sleep(delta + 0.1)
assert self.result == 1
scheduled_time = job_queue.jobs()[0].next_t.timestamp()
assert scheduled_time == pytest.approx(expected_reschedule_time)
def test_run_monthly_non_strict_day(self, job_queue, timezone):
delta, now = 1, dtm.datetime.now(timezone)
expected_reschedule_time = now + dtm.timedelta(seconds=delta)
time_of_day = expected_reschedule_time.time().replace(tzinfo=timezone)
expected_reschedule_time += dtm.timedelta(
calendar.monthrange(now.year, now.month)[1]
) - dtm.timedelta(days=now.day)
# Adjust the hour for the special case that between now & end of month a DST switch happens
expected_reschedule_time = timezone.normalize(expected_reschedule_time)
expected_reschedule_time += dtm.timedelta(
hours=time_of_day.hour - expected_reschedule_time.hour
)
expected_reschedule_time = expected_reschedule_time.timestamp()
job_queue.run_monthly(self.job_run_once, time_of_day, -1)
scheduled_time = job_queue.jobs()[0].next_t.timestamp()
assert scheduled_time == pytest.approx(expected_reschedule_time)
2020-09-27 12:59:48 +02:00
def test_default_tzinfo(self, _dp, tz_bot):
# we're parametrizing this with two different UTC offsets to exclude the possibility
# of an xpass when the test is run in a timezone with the same UTC offset
jq = JobQueue()
original_bot = _dp.bot
_dp.bot = tz_bot
jq.set_dispatcher(_dp)
try:
jq.start()
when = dtm.datetime.now(tz_bot.defaults.tzinfo) + dtm.timedelta(seconds=0.0005)
jq.run_once(self.job_run_once, when.time())
sleep(0.001)
assert self.result == 1
jq.stop()
finally:
_dp.bot = original_bot
2021-08-29 18:15:04 +02:00
def test_get_jobs(self, job_queue):
callback = self.job_context_based_callback
job1 = job_queue.run_once(callback, 10, name='name1')
job2 = job_queue.run_once(callback, 10, name='name1')
job3 = job_queue.run_once(callback, 10, name='name2')
2018-02-19 09:36:40 +01:00
assert job_queue.jobs() == (job1, job2, job3)
assert job_queue.get_jobs_by_name('name1') == (job1, job2)
assert job_queue.get_jobs_by_name('name2') == (job3,)
2021-08-29 18:15:04 +02:00
def test_job_run(self, _dp):
job_queue = JobQueue()
job_queue.set_dispatcher(_dp)
2021-08-29 18:15:04 +02:00
job = job_queue.run_repeating(self.job_context_based_callback, 0.02, context=2)
assert self.result == 0
job.run(_dp)
assert self.result == 1
def test_enable_disable_job(self, job_queue):
job = job_queue.run_repeating(self.job_run_once, 0.02)
Job.next_t (#1685) * next_t property is added to Job class Added new property to Job class - next_t, it will show the datetime when the job will be executed next time. The property is updated during JobQueue._put method, right after job is added to queue. Related to #1676 * Fixed newline and trailing whitespace * Fixed PR issues, added test 1. Added setter for next_t - now JobQueue doesn't access protected Job._next_t. 2. Fixed Job class docstring. 3. Added test for next_t property. 4. Set next_t to None for run_once jobs that already ran. * Fixed Flake8 issues * Added next_t setter for datetime, added test 1. next_t setter now can accept datetime type. 2. added test for setting datetime to next_t and added some asserts that check tests results. 3. Also noticed Job.days setter raises ValueError when it's more appropriate to raise TypeError. * Fixed test_warnings, added Number type to next_t setter 1. Changed type of error raised by interval setter from ValueError to TypeError.. 2. Fixed test_warning after changing type of errors in Job.days and Job.interval. 3. Added Number type to next_t setter - now it can accept int too. * Python 2 compatibility for test_job_next_t_property Added _UTC and _UtcOffsetTimezone for python 2 compatibility * Fixed PR issues 1. Replaced "datetime.replace tzinfo" with "datetime.astimezone" 2. Moved testing next_t setter to separate test. 3. Changed test_job_next_t_setter so it now uses non UTC timezone. * Defining tzinfo from run_once, run_repeating 1. Added option to define Job.tzinfo from run_once (by when.tzinfo) and run_repeating (first.tzinfo) 2. Added test to check that tzinfo is always passed correctly. * address review Co-authored-by: Hinrich Mahler <hinrich.mahler@freenet.de>
2020-04-18 15:08:16 +02:00
sleep(0.05)
assert self.result == 2
job.enabled = False
assert not job.enabled
sleep(0.05)
assert self.result == 2
job.enabled = True
assert job.enabled
sleep(0.05)
assert self.result == 4
Job.next_t (#1685) * next_t property is added to Job class Added new property to Job class - next_t, it will show the datetime when the job will be executed next time. The property is updated during JobQueue._put method, right after job is added to queue. Related to #1676 * Fixed newline and trailing whitespace * Fixed PR issues, added test 1. Added setter for next_t - now JobQueue doesn't access protected Job._next_t. 2. Fixed Job class docstring. 3. Added test for next_t property. 4. Set next_t to None for run_once jobs that already ran. * Fixed Flake8 issues * Added next_t setter for datetime, added test 1. next_t setter now can accept datetime type. 2. added test for setting datetime to next_t and added some asserts that check tests results. 3. Also noticed Job.days setter raises ValueError when it's more appropriate to raise TypeError. * Fixed test_warnings, added Number type to next_t setter 1. Changed type of error raised by interval setter from ValueError to TypeError.. 2. Fixed test_warning after changing type of errors in Job.days and Job.interval. 3. Added Number type to next_t setter - now it can accept int too. * Python 2 compatibility for test_job_next_t_property Added _UTC and _UtcOffsetTimezone for python 2 compatibility * Fixed PR issues 1. Replaced "datetime.replace tzinfo" with "datetime.astimezone" 2. Moved testing next_t setter to separate test. 3. Changed test_job_next_t_setter so it now uses non UTC timezone. * Defining tzinfo from run_once, run_repeating 1. Added option to define Job.tzinfo from run_once (by when.tzinfo) and run_repeating (first.tzinfo) 2. Added test to check that tzinfo is always passed correctly. * address review Co-authored-by: Hinrich Mahler <hinrich.mahler@freenet.de>
2020-04-18 15:08:16 +02:00
def test_remove_job(self, job_queue):
job = job_queue.run_repeating(self.job_run_once, 0.02)
sleep(0.05)
assert self.result == 2
assert not job.removed
job.schedule_removal()
assert job.removed
sleep(0.05)
Job.next_t (#1685) * next_t property is added to Job class Added new property to Job class - next_t, it will show the datetime when the job will be executed next time. The property is updated during JobQueue._put method, right after job is added to queue. Related to #1676 * Fixed newline and trailing whitespace * Fixed PR issues, added test 1. Added setter for next_t - now JobQueue doesn't access protected Job._next_t. 2. Fixed Job class docstring. 3. Added test for next_t property. 4. Set next_t to None for run_once jobs that already ran. * Fixed Flake8 issues * Added next_t setter for datetime, added test 1. next_t setter now can accept datetime type. 2. added test for setting datetime to next_t and added some asserts that check tests results. 3. Also noticed Job.days setter raises ValueError when it's more appropriate to raise TypeError. * Fixed test_warnings, added Number type to next_t setter 1. Changed type of error raised by interval setter from ValueError to TypeError.. 2. Fixed test_warning after changing type of errors in Job.days and Job.interval. 3. Added Number type to next_t setter - now it can accept int too. * Python 2 compatibility for test_job_next_t_property Added _UTC and _UtcOffsetTimezone for python 2 compatibility * Fixed PR issues 1. Replaced "datetime.replace tzinfo" with "datetime.astimezone" 2. Moved testing next_t setter to separate test. 3. Changed test_job_next_t_setter so it now uses non UTC timezone. * Defining tzinfo from run_once, run_repeating 1. Added option to define Job.tzinfo from run_once (by when.tzinfo) and run_repeating (first.tzinfo) 2. Added test to check that tzinfo is always passed correctly. * address review Co-authored-by: Hinrich Mahler <hinrich.mahler@freenet.de>
2020-04-18 15:08:16 +02:00
assert self.result == 2
def test_job_lt_eq(self, job_queue):
job = job_queue.run_repeating(self.job_run_once, 0.02)
assert not job == job_queue
assert not job < job
2021-08-29 18:15:04 +02:00
def test_dispatch_error_context(self, job_queue, dp):
dp.add_error_handler(self.error_handler_context)
job = job_queue.run_once(self.job_with_exception, 0.05)
sleep(0.1)
assert self.received_error[0] == 'Test Error'
assert self.received_error[1] is job
self.received_error = None
job.run(dp)
assert self.received_error[0] == 'Test Error'
assert self.received_error[1] is job
# Remove handler
2021-08-29 18:15:04 +02:00
dp.remove_error_handler(self.error_handler_context)
self.received_error = None
job = job_queue.run_once(self.job_with_exception, 0.05)
sleep(0.1)
assert self.received_error is None
job.run(dp)
assert self.received_error is None
def test_dispatch_error_that_raises_errors(self, job_queue, dp, caplog):
dp.add_error_handler(self.error_handler_raise_error)
with caplog.at_level(logging.ERROR):
job = job_queue.run_once(self.job_with_exception, 0.05)
sleep(0.1)
assert len(caplog.records) == 1
rec = caplog.records[-1]
assert 'An error was raised and an uncaught' in rec.getMessage()
caplog.clear()
with caplog.at_level(logging.ERROR):
job.run(dp)
assert len(caplog.records) == 1
rec = caplog.records[-1]
assert 'uncaught error was raised while handling' in rec.getMessage()
caplog.clear()
# Remove handler
dp.remove_error_handler(self.error_handler_raise_error)
self.received_error = None
with caplog.at_level(logging.ERROR):
job = job_queue.run_once(self.job_with_exception, 0.05)
sleep(0.1)
assert len(caplog.records) == 1
rec = caplog.records[-1]
assert 'No error handlers are registered' in rec.getMessage()
caplog.clear()
with caplog.at_level(logging.ERROR):
job.run(dp)
assert len(caplog.records) == 1
rec = caplog.records[-1]
assert 'No error handlers are registered' in rec.getMessage()
def test_custom_context(self, bot, job_queue):
dispatcher = (
DispatcherBuilder()
.bot(bot)
.context_types(
ContextTypes(
context=CustomContext, bot_data=int, user_data=float, chat_data=complex
)
)
.build()
)
job_queue.set_dispatcher(dispatcher)
def callback(context):
self.result = (
type(context),
context.user_data,
context.chat_data,
type(context.bot_data),
)
job_queue.run_once(callback, 0.1)
sleep(0.15)
assert self.result == (CustomContext, None, None, int)