Make Tests for telegram.ext Independent of Networking (#4454)

This commit is contained in:
Bibo-Joshi 2024-09-09 07:32:32 +02:00 committed by GitHub
parent b9d2efdec5
commit 0b352b043e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 243 additions and 208 deletions

View file

@ -0,0 +1,29 @@
#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015-2024
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
import asyncio
async def return_true(*args, **kwargs):
return True
async def empty_get_updates(*args, **kwargs):
# The `await` gives the event loop a chance to run other tasks
await asyncio.sleep(0)
return []

View file

@ -17,7 +17,7 @@
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
from pathlib import Path
from typing import Optional
from typing import Optional, Tuple
import pytest
from httpx import AsyncClient, AsyncHTTPTransport, Response
@ -26,7 +26,7 @@ from telegram._utils.defaultvalue import DEFAULT_NONE
from telegram._utils.strings import TextEncoding
from telegram._utils.types import ODVInput
from telegram.error import BadRequest, RetryAfter, TimedOut
from telegram.request import HTTPXRequest, RequestData
from telegram.request import BaseRequest, HTTPXRequest, RequestData
class NonchalantHttpxRequest(HTTPXRequest):
@ -60,6 +60,33 @@ class NonchalantHttpxRequest(HTTPXRequest):
pytest.xfail(f"Ignoring TimedOut error: {e}")
class OfflineRequest(BaseRequest):
"""This Request class disallows making requests to Telegram's servers.
Use this in tests that should not depend on the network.
"""
async def initialize(self) -> None:
pass
async def shutdown(self) -> None:
pass
def __init__(self, *args, **kwargs):
pass
async def do_request(
self,
url: str,
method: str,
request_data: Optional[RequestData] = None,
read_timeout: ODVInput[float] = BaseRequest.DEFAULT_NONE,
write_timeout: ODVInput[float] = BaseRequest.DEFAULT_NONE,
connect_timeout: ODVInput[float] = BaseRequest.DEFAULT_NONE,
pool_timeout: ODVInput[float] = BaseRequest.DEFAULT_NONE,
) -> Tuple[int, bytes]:
pytest.fail("OfflineRequest: Network access disallowed in this test")
async def expect_bad_request(func, message, reason):
"""
Wrapper for testing bot functions expected to result in an :class:`telegram.error.BadRequest`.

View file

@ -25,7 +25,7 @@ from telegram.ext import Application, ExtBot, Updater
from tests.auxil.ci_bots import BOT_INFO_PROVIDER
from tests.auxil.constants import PRIVATE_KEY
from tests.auxil.envvars import TEST_WITH_OPT_DEPS
from tests.auxil.networking import NonchalantHttpxRequest
from tests.auxil.networking import NonchalantHttpxRequest, OfflineRequest
def _get_bot_user(token: str) -> User:
@ -93,17 +93,20 @@ class PytestUpdater(Updater):
pass
def make_bot(bot_info=None, **kwargs):
def make_bot(bot_info=None, offline: bool = False, **kwargs):
"""
Tests are executed on tg.ext.ExtBot, as that class only extends the functionality of tg.bot
"""
token = kwargs.pop("token", (bot_info or {}).get("token"))
private_key = kwargs.pop("private_key", PRIVATE_KEY)
kwargs.pop("token", None)
request_class = OfflineRequest if offline else NonchalantHttpxRequest
return PytestExtBot(
token=token,
private_key=private_key if TEST_WITH_OPT_DEPS else None,
request=NonchalantHttpxRequest(8),
get_updates_request=NonchalantHttpxRequest(1),
request=request_class(8),
get_updates_request=request_class(1),
**kwargs,
)

View file

@ -37,15 +37,14 @@ from telegram import (
Update,
User,
)
from telegram.ext import ApplicationBuilder, Defaults, Updater
from telegram.ext.filters import MessageFilter, UpdateFilter
from telegram.ext import Defaults
from tests.auxil.build_messages import DATE
from tests.auxil.ci_bots import BOT_INFO_PROVIDER
from tests.auxil.constants import PRIVATE_KEY
from tests.auxil.envvars import RUN_TEST_OFFICIAL, TEST_WITH_OPT_DEPS
from tests.auxil.files import data_file
from tests.auxil.networking import NonchalantHttpxRequest
from tests.auxil.pytest_classes import PytestApplication, PytestBot, make_bot
from tests.auxil.pytest_classes import PytestBot, make_bot
from tests.auxil.timezones import BasicTimezone
if TEST_WITH_OPT_DEPS:
@ -124,6 +123,15 @@ async def bot(bot_info):
yield _bot
@pytest.fixture(scope="session")
async def offline_bot(bot_info):
"""Makes an offline Bot instance with the given bot_info
Note that in tests/ext we also override the `bot` fixture to return the offline bot instead.
"""
async with make_bot(bot_info, offline=True) as _bot:
yield _bot
@pytest.fixture
def one_time_bot(bot_info):
"""A function scoped bot since the session bot would shutdown when `async with app` finishes"""
@ -211,28 +219,6 @@ def subscription_channel_id(bot_info):
return bot_info["subscription_channel_id"]
@pytest.fixture
async def app(bot_info):
# We build a new bot each time so that we use `app` in a context manager without problems
application = (
ApplicationBuilder().bot(make_bot(bot_info)).application_class(PytestApplication).build()
)
yield application
if application.running:
await application.stop()
await application.shutdown()
@pytest.fixture
async def updater(bot_info):
# We build a new bot each time so that we use `updater` in a context manager without problems
up = Updater(bot=make_bot(bot_info), update_queue=asyncio.Queue())
yield up
if up.running:
await up.stop()
await up.shutdown()
@pytest.fixture
def thumb_file():
with data_file("thumb.jpg").open("rb") as f:
@ -245,23 +231,6 @@ def class_thumb_file():
yield f
@pytest.fixture(
scope="class",
params=[{"class": MessageFilter}, {"class": UpdateFilter}],
ids=["MessageFilter", "UpdateFilter"],
)
def mock_filter(request):
class MockFilter(request.param["class"]):
def __init__(self):
super().__init__()
self.tested = False
def filter(self, _):
self.tested = True
return MockFilter()
def _get_false_update_fixture_decorator_params():
message = Message(1, DATE, Chat(1, ""), from_user=User(1, "", False), text="test")
params = [

102
tests/ext/conftest.py Normal file
View file

@ -0,0 +1,102 @@
import asyncio
import pytest
from telegram.ext import ApplicationBuilder, Updater
from telegram.ext.filters import MessageFilter, UpdateFilter
from tests.auxil.constants import PRIVATE_KEY
from tests.auxil.envvars import TEST_WITH_OPT_DEPS
from tests.auxil.monkeypatch import return_true
from tests.auxil.networking import OfflineRequest
from tests.auxil.pytest_classes import PytestApplication, PytestBot, make_bot
# This module overrides the bot fixtures defined in the global conftest.py to use the offline bot.
# We don't want the tests on telegram.ext to depend on the network, so we use the offline bot
# instead.
@pytest.fixture(scope="session")
async def bot(bot_info, offline_bot):
return offline_bot
@pytest.fixture
async def app(bot_info, monkeypatch):
# We build a new bot each time so that we use `app` in a context manager without problems
application = (
ApplicationBuilder()
.bot(make_bot(bot_info, offline=True))
.application_class(PytestApplication)
.build()
)
monkeypatch.setattr(application.bot, "delete_webhook", return_true)
monkeypatch.setattr(application.bot, "set_webhook", return_true)
yield application
if application.running:
await application.stop()
await application.shutdown()
@pytest.fixture
async def updater(bot_info, monkeypatch):
# We build a new bot each time so that we use `updater` in a context manager without problems
up = Updater(bot=make_bot(bot_info, offline=True), update_queue=asyncio.Queue())
monkeypatch.setattr(up.bot, "delete_webhook", return_true)
monkeypatch.setattr(up.bot, "set_webhook", return_true)
yield up
if up.running:
await up.stop()
await up.shutdown()
@pytest.fixture
def one_time_bot(bot_info):
"""A function scoped bot since the session bot would shutdown when `async with app` finishes"""
return make_bot(bot_info, offline=True)
@pytest.fixture(scope="session")
async def cdc_bot(bot_info):
"""Makes an ExtBot instance with the given bot_info that uses arbitrary callback_data"""
async with make_bot(bot_info, arbitrary_callback_data=True, offline=True) as _bot:
yield _bot
@pytest.fixture(scope="session")
async def raw_bot(bot_info):
"""Makes an regular Bot instance with the given bot_info"""
async with PytestBot(
bot_info["token"],
private_key=PRIVATE_KEY if TEST_WITH_OPT_DEPS else None,
request=OfflineRequest(),
get_updates_request=OfflineRequest(),
) as _bot:
yield _bot
@pytest.fixture
async def one_time_raw_bot(bot_info):
"""Makes an regular Bot instance with the given bot_info"""
return PytestBot(
bot_info["token"],
private_key=PRIVATE_KEY if TEST_WITH_OPT_DEPS else None,
request=OfflineRequest(),
get_updates_request=OfflineRequest(),
)
@pytest.fixture(
scope="class",
params=[{"class": MessageFilter}, {"class": UpdateFilter}],
ids=["MessageFilter", "UpdateFilter"],
)
def mock_filter(request):
class MockFilter(request.param["class"]):
def __init__(self):
super().__init__()
self.tested = False
def filter(self, _):
self.tested = True
return MockFilter()

View file

@ -60,6 +60,7 @@ from telegram.warnings import PTBDeprecationWarning, PTBUserWarning
from tests.auxil.asyncio_helpers import call_after
from tests.auxil.build_messages import make_message_update
from tests.auxil.files import PROJECT_ROOT_PATH
from tests.auxil.monkeypatch import empty_get_updates, return_true
from tests.auxil.networking import send_webhook_message
from tests.auxil.pytest_classes import PytestApplication, PytestUpdater, make_bot
from tests.auxil.slots import mro_slots
@ -432,7 +433,7 @@ class TestApplication:
@pytest.mark.parametrize("job_queue", [True, False])
@pytest.mark.filterwarnings("ignore::telegram.warnings.PTBUserWarning")
async def test_start_stop_processing_updates(self, one_time_bot, job_queue):
async def test_start_stop_processing_updates(self, one_time_bot, job_queue, monkeypatch):
# TODO: repeat a similar test for create_task, persistence processing and job queue
if job_queue:
app = ApplicationBuilder().bot(one_time_bot).build()
@ -442,6 +443,9 @@ class TestApplication:
async def callback(u, c):
self.received = u
monkeypatch.setattr(app.bot, "get_updates", empty_get_updates)
monkeypatch.setattr(app.bot, "delete_webhook", return_true)
assert not app.running
assert not app.updater.running
if job_queue:
@ -1519,11 +1523,6 @@ class TestApplication:
def test_run_polling_timeout_deprecation_warnings(
self, timeout_name, monkeypatch, recwarn, app
):
async def get_updates(*args, **kwargs):
# This makes sure that other coroutines have a chance of running as well
await asyncio.sleep(0)
return []
def thread_target():
waited = 0
while not app.running:
@ -1536,7 +1535,7 @@ class TestApplication:
os.kill(os.getpid(), signal.SIGINT)
monkeypatch.setattr(app.bot, "get_updates", get_updates)
monkeypatch.setattr(app.bot, "get_updates", empty_get_updates)
thread = Thread(target=thread_target)
thread.start()
@ -1563,11 +1562,6 @@ class TestApplication:
def test_run_polling_post_init(self, one_time_bot, monkeypatch):
events = []
async def get_updates(*args, **kwargs):
# This makes sure that other coroutines have a chance of running as well
await asyncio.sleep(0)
return []
def thread_target():
waited = 0
while not app.running:
@ -1589,7 +1583,7 @@ class TestApplication:
.build()
)
app.bot._unfreeze()
monkeypatch.setattr(app.bot, "get_updates", get_updates)
monkeypatch.setattr(app.bot, "get_updates", empty_get_updates)
monkeypatch.setattr(
app, "initialize", call_after(app.initialize, lambda _: events.append("init"))
)
@ -1598,6 +1592,7 @@ class TestApplication:
"start_polling",
call_after(app.updater.start_polling, lambda _: events.append("start_polling")),
)
monkeypatch.setattr(app.bot, "delete_webhook", return_true)
thread = Thread(target=thread_target)
thread.start()
@ -1612,11 +1607,6 @@ class TestApplication:
def test_run_polling_post_shutdown(self, one_time_bot, monkeypatch):
events = []
async def get_updates(*args, **kwargs):
# This makes sure that other coroutines have a chance of running as well
await asyncio.sleep(0)
return []
def thread_target():
waited = 0
while not app.running:
@ -1638,7 +1628,7 @@ class TestApplication:
.build()
)
app.bot._unfreeze()
monkeypatch.setattr(app.bot, "get_updates", get_updates)
monkeypatch.setattr(app.bot, "get_updates", empty_get_updates)
monkeypatch.setattr(
app, "shutdown", call_after(app.shutdown, lambda _: events.append("shutdown"))
)
@ -1647,6 +1637,7 @@ class TestApplication:
"shutdown",
call_after(app.updater.shutdown, lambda _: events.append("updater.shutdown")),
)
monkeypatch.setattr(app.bot, "delete_webhook", return_true)
thread = Thread(target=thread_target)
thread.start()
@ -1665,11 +1656,6 @@ class TestApplication:
def test_run_polling_post_stop(self, one_time_bot, monkeypatch):
events = []
async def get_updates(*args, **kwargs):
# This makes sure that other coroutines have a chance of running as well
await asyncio.sleep(0)
return []
def thread_target():
waited = 0
while not app.running:
@ -1691,7 +1677,7 @@ class TestApplication:
.build()
)
app.bot._unfreeze()
monkeypatch.setattr(app.bot, "get_updates", get_updates)
monkeypatch.setattr(app.bot, "get_updates", empty_get_updates)
monkeypatch.setattr(app, "stop", call_after(app.stop, lambda _: events.append("stop")))
monkeypatch.setattr(
app.updater,
@ -1703,6 +1689,7 @@ class TestApplication:
"shutdown",
call_after(app.updater.shutdown, lambda _: events.append("updater.shutdown")),
)
monkeypatch.setattr(app.bot, "delete_webhook", return_true)
thread = Thread(target=thread_target)
thread.start()
@ -1784,12 +1771,6 @@ class TestApplication:
def test_run_webhook_basic(self, app, monkeypatch, caplog):
assertions = {}
async def delete_webhook(*args, **kwargs):
return True
async def set_webhook(*args, **kwargs):
return True
def thread_target():
waited = 0
while not app.running:
@ -1820,8 +1801,6 @@ class TestApplication:
assertions["updater_not_running"] = not app.updater.running
assertions["job_queue_not_running"] = not app.job_queue.scheduler.running
monkeypatch.setattr(app.bot, "set_webhook", set_webhook)
monkeypatch.setattr(app.bot, "delete_webhook", delete_webhook)
app.add_handler(TypeHandler(object, self.callback_set_count(42)))
thread = Thread(target=thread_target)
@ -1857,17 +1836,6 @@ class TestApplication:
def test_run_webhook_post_init(self, one_time_bot, monkeypatch):
events = []
async def delete_webhook(*args, **kwargs):
return True
async def set_webhook(*args, **kwargs):
return True
async def get_updates(*args, **kwargs):
# This makes sure that other coroutines have a chance of running as well
await asyncio.sleep(0)
return []
def thread_target():
waited = 0
while not app.running:
@ -1889,8 +1857,7 @@ class TestApplication:
.build()
)
app.bot._unfreeze()
monkeypatch.setattr(app.bot, "set_webhook", set_webhook)
monkeypatch.setattr(app.bot, "delete_webhook", delete_webhook)
monkeypatch.setattr(
app, "initialize", call_after(app.initialize, lambda _: events.append("init"))
)
@ -1899,6 +1866,8 @@ class TestApplication:
"start_webhook",
call_after(app.updater.start_webhook, lambda _: events.append("start_webhook")),
)
monkeypatch.setattr(app.bot, "delete_webhook", return_true)
monkeypatch.setattr(app.bot, "set_webhook", return_true)
thread = Thread(target=thread_target)
thread.start()
@ -1923,17 +1892,6 @@ class TestApplication:
def test_run_webhook_post_shutdown(self, one_time_bot, monkeypatch):
events = []
async def delete_webhook(*args, **kwargs):
return True
async def set_webhook(*args, **kwargs):
return True
async def get_updates(*args, **kwargs):
# This makes sure that other coroutines have a chance of running as well
await asyncio.sleep(0)
return []
def thread_target():
waited = 0
while not app.running:
@ -1955,8 +1913,7 @@ class TestApplication:
.build()
)
app.bot._unfreeze()
monkeypatch.setattr(app.bot, "set_webhook", set_webhook)
monkeypatch.setattr(app.bot, "delete_webhook", delete_webhook)
monkeypatch.setattr(
app, "shutdown", call_after(app.shutdown, lambda _: events.append("shutdown"))
)
@ -1965,6 +1922,8 @@ class TestApplication:
"shutdown",
call_after(app.updater.shutdown, lambda _: events.append("updater.shutdown")),
)
monkeypatch.setattr(app.bot, "delete_webhook", return_true)
monkeypatch.setattr(app.bot, "set_webhook", return_true)
thread = Thread(target=thread_target)
thread.start()
@ -1993,17 +1952,6 @@ class TestApplication:
def test_run_webhook_post_stop(self, one_time_bot, monkeypatch):
events = []
async def delete_webhook(*args, **kwargs):
return True
async def set_webhook(*args, **kwargs):
return True
async def get_updates(*args, **kwargs):
# This makes sure that other coroutines have a chance of running as well
await asyncio.sleep(0)
return []
def thread_target():
waited = 0
while not app.running:
@ -2025,8 +1973,7 @@ class TestApplication:
.build()
)
app.bot._unfreeze()
monkeypatch.setattr(app.bot, "set_webhook", set_webhook)
monkeypatch.setattr(app.bot, "delete_webhook", delete_webhook)
monkeypatch.setattr(app, "stop", call_after(app.stop, lambda _: events.append("stop")))
monkeypatch.setattr(
app.updater,
@ -2038,6 +1985,8 @@ class TestApplication:
"shutdown",
call_after(app.updater.shutdown, lambda _: events.append("updater.shutdown")),
)
monkeypatch.setattr(app.bot, "delete_webhook", return_true)
monkeypatch.setattr(app.bot, "set_webhook", return_true)
thread = Thread(target=thread_target)
thread.start()
@ -2217,10 +2166,6 @@ class TestApplication:
async def callback(*args, **kwargs):
called_callbacks.add(kwargs["name"])
async def get_updates(*args, **kwargs):
await asyncio.sleep(0)
return []
for cls, method, entry in [
(Application, "initialize", "app_initialize"),
(Application, "start", "app_start"),
@ -2249,7 +2194,8 @@ class TestApplication:
.post_shutdown(functools.partial(callback, name="post_shutdown"))
.build()
)
monkeypatch.setattr(app.bot, "get_updates", get_updates)
monkeypatch.setattr(app.bot, "get_updates", empty_get_updates)
monkeypatch.setattr(app.bot, "delete_webhook", return_true)
app.add_handler(TypeHandler(object, update_logger_callback), group=-10)
app.add_handler(TypeHandler(object, handler_callback))
@ -2322,6 +2268,9 @@ class TestApplication:
Updater, "shutdown", call_after(Updater.shutdown, after_shutdown("updater"))
)
app = ApplicationBuilder().bot(one_time_bot).build()
monkeypatch.setattr(app.bot, "delete_webhook", return_true)
monkeypatch.setattr(app.bot, "get_updates", empty_get_updates)
with pytest.raises(RuntimeError, match="Test Exception"):
app.run_polling(close_loop=False)
@ -2418,7 +2367,9 @@ class TestApplication:
received_signals.append(args[0])
loop = asyncio.get_event_loop()
monkeypatch.setattr(loop, "add_signal_handler", signal_handler_test)
monkeypatch.setattr(app.bot, "get_updates", empty_get_updates)
def abort_app():
raise SystemExit
@ -2496,16 +2447,6 @@ class TestApplication:
called_stop_running = threading.Event()
assertions = {}
async def get_updates(*args, **kwargs):
await asyncio.sleep(0)
return []
async def delete_webhook(*args, **kwargs):
return True
async def set_webhook(*args, **kwargs):
return True
async def post_init(app):
# Simply calling app.update_queue.put_nowait(method) in the thread_target doesn't work
# for some reason (probably threading magic), so we use an event from the thread_target
@ -2523,9 +2464,7 @@ class TestApplication:
.post_init(post_init)
.build()
)
monkeypatch.setattr(app.bot, "get_updates", get_updates)
monkeypatch.setattr(app.bot, "set_webhook", set_webhook)
monkeypatch.setattr(app.bot, "delete_webhook", delete_webhook)
monkeypatch.setattr(app.bot, "get_updates", empty_get_updates)
events = []
monkeypatch.setattr(
@ -2543,6 +2482,8 @@ class TestApplication:
"shutdown",
call_after(app.shutdown, lambda _: events.append("app.shutdown")),
)
monkeypatch.setattr(app.bot, "set_webhook", return_true)
monkeypatch.setattr(app.bot, "delete_webhook", return_true)
def thread_target():
waited = 0

View file

@ -20,7 +20,6 @@
import pytest
from telegram import (
Bot,
CallbackQuery,
Chat,
InlineKeyboardButton,
@ -194,8 +193,7 @@ class TestCallbackContext:
callback_context = CallbackContext(app)
assert callback_context.application is app
def test_drop_callback_data_exception(self, bot, app):
non_ext_bot = Bot(bot.token)
def test_drop_callback_data_exception(self, bot, app, raw_bot):
update = Update(
0, message=Message(0, None, Chat(1, "chat"), from_user=User(1, "user", False))
)
@ -206,7 +204,7 @@ class TestCallbackContext:
callback_context.drop_callback_data(None)
try:
app.bot = non_ext_bot
app.bot = raw_bot
with pytest.raises(RuntimeError, match="telegram.Bot does not allow for"):
callback_context.drop_callback_data(None)
finally:

View file

@ -30,12 +30,12 @@ from telegram import Bot, InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram._utils.defaultvalue import DEFAULT_NONE
from telegram.error import InvalidToken, RetryAfter, TelegramError, TimedOut
from telegram.ext import ExtBot, InvalidCallbackData, Updater
from telegram.request import HTTPXRequest
from tests.auxil.build_messages import make_message, make_message_update
from tests.auxil.envvars import TEST_WITH_OPT_DEPS
from tests.auxil.files import TEST_DATA_PATH, data_file
from tests.auxil.monkeypatch import empty_get_updates, return_true
from tests.auxil.networking import send_webhook_message
from tests.auxil.pytest_classes import PytestBot, make_bot
from tests.auxil.pytest_classes import make_bot
from tests.auxil.slots import mro_slots
UNIX_AVAILABLE = False
@ -179,14 +179,11 @@ class TestUpdater:
@pytest.mark.parametrize("method", ["start_polling", "start_webhook"])
async def test_shutdown_while_running(self, updater, method, monkeypatch):
async def set_webhook(*args, **kwargs):
return True
monkeypatch.setattr(updater.bot, "set_webhook", set_webhook)
ip = "127.0.0.1"
port = randrange(1024, 49152) # Select random port
monkeypatch.setattr(updater.bot, "get_updates", empty_get_updates)
async with updater:
if "webhook" in method:
await getattr(updater, method)(
@ -246,13 +243,12 @@ class TestUpdater:
await asyncio.sleep(0.1)
return []
orig_del_webhook = updater.bot.delete_webhook
async def delete_webhook(*args, **kwargs):
# Dropping pending updates is done by passing the parameter to delete_webhook
if kwargs.get("drop_pending_updates"):
self.message_count += 1
return await orig_del_webhook(*args, **kwargs)
await asyncio.sleep(0)
return True
monkeypatch.setattr(updater.bot, "get_updates", get_updates)
monkeypatch.setattr(updater.bot, "delete_webhook", delete_webhook)
@ -264,7 +260,6 @@ class TestUpdater:
await updates.join()
await updater.stop()
assert not updater.running
assert not (await updater.bot.get_webhook_info()).url
if drop_pending_updates:
assert self.message_count == 1
else:
@ -281,7 +276,6 @@ class TestUpdater:
await updates.join()
await updater.stop()
assert not updater.running
assert not (await updater.bot.get_webhook_info()).url
self.received = []
self.message_count = 0
@ -384,11 +378,8 @@ class TestUpdater:
assert log_found
async def test_polling_mark_updates_as_read_failure(self, monkeypatch, updater, caplog):
async def get_updates(*args, **kwargs):
await asyncio.sleep(0)
return []
monkeypatch.setattr(updater.bot, "get_updates", get_updates)
monkeypatch.setattr(updater.bot, "get_updates", empty_get_updates)
async with updater:
await updater.start_polling()
@ -411,7 +402,10 @@ class TestUpdater:
assert log_found
async def test_start_polling_already_running(self, updater):
async def test_start_polling_already_running(self, updater, monkeypatch):
monkeypatch.setattr(updater.bot, "get_updates", empty_get_updates)
async with updater:
await updater.start_polling()
task = asyncio.create_task(updater.start_polling())
@ -498,15 +492,13 @@ class TestUpdater:
async def test_start_polling_bootstrap_retries(
self, updater, monkeypatch, exception_class, retries
):
async def do_request(*args, **kwargs):
async def delete_webhook(*args, **kwargs):
self.message_count += 1
raise exception_class(str(self.message_count))
async with updater:
# Patch within the context so that updater.bot.initialize can still be called
# by the context manager
monkeypatch.setattr(HTTPXRequest, "do_request", do_request)
monkeypatch.setattr(updater.bot, "delete_webhook", delete_webhook)
async with updater:
if exception_class == InvalidToken:
with pytest.raises(InvalidToken, match="1"):
await updater.start_polling(bootstrap_retries=retries)
@ -705,14 +697,23 @@ class TestUpdater:
"unix", [None, "file_path", "socket_object"] if UNIX_AVAILABLE else [None]
)
async def test_webhook_basic(
self, monkeypatch, updater, drop_pending_updates, ext_bot, secret_token, unix, file_path
self,
monkeypatch,
updater,
drop_pending_updates,
ext_bot,
secret_token,
unix,
file_path,
one_time_bot,
one_time_raw_bot,
):
# Testing with both ExtBot and Bot to make sure any logic in WebhookHandler
# that depends on this distinction works
if ext_bot and not isinstance(updater.bot, ExtBot):
updater.bot = ExtBot(updater.bot.token)
updater.bot = one_time_bot
if not ext_bot and type(updater.bot) is not Bot:
updater.bot = PytestBot(updater.bot.token)
updater.bot = one_time_raw_bot
async def delete_webhook(*args, **kwargs):
# Dropping pending updates is done by passing the parameter to delete_webhook
@ -720,10 +721,7 @@ class TestUpdater:
self.message_count += 1
return True
async def set_webhook(*args, **kwargs):
return True
monkeypatch.setattr(updater.bot, "set_webhook", set_webhook)
monkeypatch.setattr(updater.bot, "set_webhook", return_true)
monkeypatch.setattr(updater.bot, "delete_webhook", delete_webhook)
ip = "127.0.0.1"
@ -873,12 +871,6 @@ class TestUpdater:
await updater.start_webhook(unix="DoesntMatter", webhook_url="TOKEN")
async def test_start_webhook_already_running(self, updater, monkeypatch):
async def return_true(*args, **kwargs):
return True
monkeypatch.setattr(updater.bot, "set_webhook", return_true)
monkeypatch.setattr(updater.bot, "delete_webhook", return_true)
ip = "127.0.0.1"
port = randrange(1024, 49152) # Select random port
async with updater:
@ -977,12 +969,9 @@ class TestUpdater:
extensively in test_bot.py in conjunction with get_updates."""
updater = Updater(bot=cdc_bot, update_queue=asyncio.Queue())
async def return_true(*args, **kwargs):
return True
monkeypatch.setattr(updater.bot, "set_webhook", return_true)
try:
monkeypatch.setattr(updater.bot, "set_webhook", return_true)
monkeypatch.setattr(updater.bot, "delete_webhook", return_true)
ip = "127.0.0.1"
port = randrange(1024, 49152) # Select random port
@ -1025,11 +1014,6 @@ class TestUpdater:
updater.bot.callback_data_cache.clear_callback_queries()
async def test_webhook_invalid_ssl(self, monkeypatch, updater):
async def return_true(*args, **kwargs):
return True
monkeypatch.setattr(updater.bot, "set_webhook", return_true)
monkeypatch.setattr(updater.bot, "delete_webhook", return_true)
ip = "127.0.0.1"
port = randrange(1024, 49152) # Select random port
@ -1056,9 +1040,6 @@ class TestUpdater:
self.test_flag.append(bool(kwargs.get("certificate")))
return True
async def return_true(*args, **kwargs):
return True
orig_wh_server_init = WebhookServer.__init__
def webhook_server_init(*args, **kwargs):
@ -1066,7 +1047,7 @@ class TestUpdater:
orig_wh_server_init(*args, **kwargs)
monkeypatch.setattr(updater.bot, "set_webhook", set_webhook)
monkeypatch.setattr(updater.bot, "delete_webhook", return_true)
monkeypatch.setattr(
"telegram.ext._utils.webhookhandler.WebhookServer.__init__", webhook_server_init
)
@ -1088,15 +1069,13 @@ class TestUpdater:
async def test_start_webhook_bootstrap_retries(
self, updater, monkeypatch, exception_class, retries
):
async def do_request(*args, **kwargs):
async def set_webhook(*args, **kwargs):
self.message_count += 1
raise exception_class(str(self.message_count))
async with updater:
# Patch within the context so that updater.bot.initialize can still be called
# by the context manager
monkeypatch.setattr(HTTPXRequest, "do_request", do_request)
monkeypatch.setattr(updater.bot, "set_webhook", set_webhook)
async with updater:
if exception_class == InvalidToken:
with pytest.raises(InvalidToken, match="1"):
await updater.start_webhook(bootstrap_retries=retries)
@ -1107,11 +1086,6 @@ class TestUpdater:
)
async def test_webhook_invalid_posts(self, updater, monkeypatch):
async def return_true(*args, **kwargs):
return True
monkeypatch.setattr(updater.bot, "set_webhook", return_true)
monkeypatch.setattr(updater.bot, "delete_webhook", return_true)
ip = "127.0.0.1"
port = randrange(1024, 49152)
@ -1146,17 +1120,9 @@ class TestUpdater:
await updater.stop()
async def test_webhook_update_de_json_fails(self, monkeypatch, updater, caplog):
async def delete_webhook(*args, **kwargs):
return True
async def set_webhook(*args, **kwargs):
return True
def de_json_fails(*args, **kwargs):
raise TypeError("Invalid input")
monkeypatch.setattr(updater.bot, "set_webhook", set_webhook)
monkeypatch.setattr(updater.bot, "delete_webhook", delete_webhook)
orig_de_json = Update.de_json
monkeypatch.setattr(Update, "de_json", de_json_fails)