python-telegram-bot/tests/ext/test_updater.py

1165 lines
44 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
2024-02-19 20:06:25 +01:00
# Copyright (C) 2015-2024
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
import asyncio
import logging
import platform
from collections import defaultdict
from http import HTTPStatus
2021-10-13 08:12:48 +02:00
from pathlib import Path
2016-04-27 00:28:21 +02:00
from random import randrange
import pytest
from telegram import Bot, InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram._utils.defaultvalue import DEFAULT_NONE
from telegram.error import InvalidToken, RetryAfter, TelegramError, TimedOut
from telegram.ext import ExtBot, InvalidCallbackData, Updater
2023-02-22 20:19:46 +01:00
from tests.auxil.build_messages import make_message, make_message_update
from tests.auxil.envvars import TEST_WITH_OPT_DEPS
from tests.auxil.files import TEST_DATA_PATH, data_file
from tests.auxil.monkeypatch import empty_get_updates, return_true
2023-02-22 20:19:46 +01:00
from tests.auxil.networking import send_webhook_message
from tests.auxil.pytest_classes import make_bot
2023-02-22 20:19:46 +01:00
from tests.auxil.slots import mro_slots
UNIX_AVAILABLE = False
if TEST_WITH_OPT_DEPS:
try:
from tornado.netutil import bind_unix_socket
UNIX_AVAILABLE = True
except ImportError:
UNIX_AVAILABLE = False
from telegram.ext._utils.webhookhandler import WebhookServer
@pytest.mark.skipif(
TEST_WITH_OPT_DEPS, reason="Only relevant if the optional dependency is not installed"
)
class TestNoWebhooks:
async def test_no_webhooks(self, bot):
2023-03-25 19:18:04 +01:00
async with Updater(bot=bot, update_queue=asyncio.Queue()) as updater:
with pytest.raises(RuntimeError, match=r"python-telegram-bot\[webhooks\]"):
await updater.start_webhook()
@pytest.mark.skipif(
not TEST_WITH_OPT_DEPS,
reason="Only relevant if the optional dependency is installed",
)
class TestUpdater:
message_count = 0
received = None
attempts = 0
err_handler_called = None
cb_handler_called = None
Add test for clean argument of Updater.start_polling/webhook (#2002) * added test for 'clean' argument passed to 'start_polling()' * remove TODO * prettify * remove bool from func name * improve name-ing of fake update func * cleanup class and nameing * replace while for for * swap valueerror for runtimeerror * remove all other code to reduce testing * add comments * don't raise error, complete cycle and assert * remove inf loop protection * Revert "remove all other code to reduce testing" This reverts commit 4566a1debd659ce831a52085867c5a74d1575262. * remove error parametrization * remove comment * remove pass from class * rename update_id to offset as the original get_updates() takes argument offset (which is the update_id) * rename test func to match original func * fix comment * shorten for loop * mock get_updates() behavior when 'offset' is passed. Assert with get_updates() * remove other functions to reduce testing * replicate original get_updates() * move fakeupdate class and list creation outside get_updates and store in var * loop from 0 to make update_id consistant w array key, just easier to debug * update comments * Revert "remove other functions to reduce testing" This reverts commit 1fb498a6ccb619a745b745d61b8345bf1b29b6ba. * fix typo * Revert "fix typo" This reverts commit ade9fec609d7262051b24d17fa6b5b1cf579d760. * Revert "Revert "remove other functions to reduce testing"" This reverts commit 734de1371cfddbd003ceb3c4498e837bab55bd05. * Revert "update comments" This reverts commit f3a032e75eee8d2416a236208b71c35ccc237345. * Revert "loop from 0 to make update_id consistant w array key, just easier to debug" This reverts commit 0c6881d8a1066d5762131d28fb350eaeb92b341c. * Revert "move fakeupdate class and list creation outside get_updates and store in var" This reverts commit 71de999300f052afd6b0ffe2914bfdfc201b6b11. * Revert "replicate original get_updates()" This reverts commit 5d0710ac3a8e6f6ddf628b1ad1d53bfa219decdc. * Revert "remove other functions to reduce testing" This reverts commit 1fb498a6ccb619a745b745d61b8345bf1b29b6ba. * Revert "mock get_updates() behavior when 'offset' is passed. Assert with get_updates()" This reverts commit 8c727ba1e814871a59d7ae66d3dcf411847330d2. * loop from 0 to make update_id consistant w array key, for consitency Co-authored-by: ikkemaniac <ikkemaniac@localhost>
2020-06-24 00:25:58 +02:00
offset = 0
test_flag = False
response_text = "<html><title>{1}: {0}</title><body>{1}: {0}</body></html>"
2015-11-21 15:45:45 +01:00
@pytest.fixture(autouse=True)
2023-03-25 19:18:04 +01:00
def _reset(self):
2015-11-24 20:34:38 +01:00
self.message_count = 0
self.received = None
self.attempts = 0
self.err_handler_called = None
self.cb_handler_called = None
self.test_flag = False
2016-04-21 13:07:44 +02:00
# This is needed instead of pytest's temp_path because the file path gets too long on macOS
# otherwise
@pytest.fixture
def file_path(self) -> str:
path = TEST_DATA_PATH / "test.sock"
yield str(path)
path.unlink(missing_ok=True)
def error_callback(self, error):
self.received = error
self.err_handler_called.set()
2015-11-21 19:35:24 +01:00
2021-08-29 18:15:04 +02:00
def callback(self, update, context):
self.received = update.message.text
self.cb_handler_called.set()
2023-02-22 20:19:46 +01:00
async def test_slot_behaviour(self, updater):
async with updater:
for at in updater.__slots__:
2023-03-25 19:18:04 +01:00
attr = f"_Updater{at}" if at.startswith("__") and not at.endswith("__") else at
assert getattr(updater, attr, "err") != "err", f"got extra slot '{attr}'"
assert len(mro_slots(updater)) == len(set(mro_slots(updater))), "duplicate slot"
def test_init(self, bot):
queue = asyncio.Queue()
updater = Updater(bot=bot, update_queue=queue)
assert updater.bot is bot
assert updater.update_queue is queue
def test_repr(self, bot):
queue = asyncio.Queue()
updater = Updater(bot=bot, update_queue=queue)
assert repr(updater) == f"Updater[bot={updater.bot!r}]"
async def test_initialize(self, bot, monkeypatch):
async def initialize_bot(*args, **kwargs):
self.test_flag = True
2022-06-27 18:46:52 +02:00
async with make_bot(token=bot.token) as test_bot:
monkeypatch.setattr(test_bot, "initialize", initialize_bot)
updater = Updater(bot=test_bot, update_queue=asyncio.Queue())
await updater.initialize()
assert self.test_flag
async def test_shutdown(self, bot, monkeypatch):
async def shutdown_bot(*args, **kwargs):
self.test_flag = True
2022-06-27 18:46:52 +02:00
async with make_bot(token=bot.token) as test_bot:
monkeypatch.setattr(test_bot, "shutdown", shutdown_bot)
updater = Updater(bot=test_bot, update_queue=asyncio.Queue())
await updater.initialize()
await updater.shutdown()
assert self.test_flag
async def test_multiple_inits_and_shutdowns(self, updater, monkeypatch):
self.test_flag = defaultdict(int)
async def initialize(*args, **kargs):
self.test_flag["init"] += 1
async def shutdown(*args, **kwargs):
self.test_flag["shutdown"] += 1
monkeypatch.setattr(updater.bot, "initialize", initialize)
monkeypatch.setattr(updater.bot, "shutdown", shutdown)
await updater.initialize()
await updater.initialize()
await updater.initialize()
await updater.shutdown()
await updater.shutdown()
await updater.shutdown()
assert self.test_flag["init"] == 1
assert self.test_flag["shutdown"] == 1
async def test_multiple_init_cycles(self, updater):
# nothing really to assert - this should just not fail
async with updater:
await updater.bot.get_me()
async with updater:
await updater.bot.get_me()
@pytest.mark.parametrize("method", ["start_polling", "start_webhook"])
async def test_start_without_initialize(self, updater, method):
with pytest.raises(RuntimeError, match="not initialized"):
await getattr(updater, method)()
@pytest.mark.parametrize("method", ["start_polling", "start_webhook"])
async def test_shutdown_while_running(self, updater, method, monkeypatch):
ip = "127.0.0.1"
port = randrange(1024, 49152) # Select random port
monkeypatch.setattr(updater.bot, "get_updates", empty_get_updates)
async with updater:
if "webhook" in method:
await getattr(updater, method)(
ip_address=ip,
port=port,
)
else:
await getattr(updater, method)()
with pytest.raises(RuntimeError, match="still running"):
await updater.shutdown()
await updater.stop()
async def test_context_manager(self, monkeypatch, updater):
async def initialize(*args, **kwargs):
self.test_flag = ["initialize"]
async def shutdown(*args, **kwargs):
self.test_flag.append("stop")
monkeypatch.setattr(Updater, "initialize", initialize)
monkeypatch.setattr(Updater, "shutdown", shutdown)
async with updater:
pass
assert self.test_flag == ["initialize", "stop"]
async def test_context_manager_exception_on_init(self, monkeypatch, updater):
async def initialize(*args, **kwargs):
raise RuntimeError("initialize")
async def shutdown(*args):
self.test_flag = "stop"
monkeypatch.setattr(Updater, "initialize", initialize)
monkeypatch.setattr(Updater, "shutdown", shutdown)
with pytest.raises(RuntimeError, match="initialize"):
async with updater:
pass
assert self.test_flag == "stop"
2023-03-25 19:18:04 +01:00
@pytest.mark.parametrize("drop_pending_updates", [True, False])
async def test_polling_basic(self, monkeypatch, updater, drop_pending_updates):
updates = asyncio.Queue()
await updates.put(Update(update_id=1))
await updates.put(Update(update_id=2))
async def get_updates(*args, **kwargs):
if not updates.empty():
next_update = await updates.get()
updates.task_done()
return [next_update]
await asyncio.sleep(0.1)
return []
async def delete_webhook(*args, **kwargs):
# Dropping pending updates is done by passing the parameter to delete_webhook
if kwargs.get("drop_pending_updates"):
self.message_count += 1
await asyncio.sleep(0)
return True
monkeypatch.setattr(updater.bot, "get_updates", get_updates)
monkeypatch.setattr(updater.bot, "delete_webhook", delete_webhook)
async with updater:
return_value = await updater.start_polling(drop_pending_updates=drop_pending_updates)
assert return_value is updater.update_queue
assert updater.running
await updates.join()
await updater.stop()
assert not updater.running
if drop_pending_updates:
assert self.message_count == 1
else:
assert self.message_count == 0
await updates.put(Update(update_id=3))
await updates.put(Update(update_id=4))
# We call the same logic twice to make sure that restarting the updater works as well
await updater.start_polling(drop_pending_updates=drop_pending_updates)
assert updater.running
tasks = asyncio.all_tasks()
assert any("Updater:start_polling:polling_task" in t.get_name() for t in tasks)
await updates.join()
await updater.stop()
assert not updater.running
self.received = []
self.message_count = 0
while not updater.update_queue.empty():
update = updater.update_queue.get_nowait()
self.message_count += 1
self.received.append(update.update_id)
assert self.message_count == 4
assert self.received == [1, 2, 3, 4]
async def test_polling_mark_updates_as_read(self, monkeypatch, updater, caplog):
updates = asyncio.Queue()
max_update_id = 3
for i in range(1, max_update_id + 1):
await updates.put(Update(update_id=i))
tracking_flag = False
received_kwargs = {}
expected_kwargs = {
"timeout": 0,
"read_timeout": "read_timeout",
"connect_timeout": "connect_timeout",
"write_timeout": "write_timeout",
"pool_timeout": "pool_timeout",
"allowed_updates": "allowed_updates",
}
async def get_updates(*args, **kwargs):
if tracking_flag:
received_kwargs.update(kwargs)
if not updates.empty():
next_update = await updates.get()
updates.task_done()
return [next_update]
await asyncio.sleep(0)
return []
monkeypatch.setattr(updater.bot, "get_updates", get_updates)
async with updater:
await updater.start_polling(**expected_kwargs)
await updates.join()
assert not received_kwargs
# Set the flag only now since we want to make sure that the get_updates
# is called one last time by updater.stop()
tracking_flag = True
with caplog.at_level(logging.DEBUG):
await updater.stop()
# ensure that the last fetched update was still marked as read
assert received_kwargs["offset"] == max_update_id + 1
# ensure that the correct arguments where passed to the last `get_updates` call
for name, value in expected_kwargs.items():
assert received_kwargs[name] == value
assert len(caplog.records) >= 1
log_found = False
for record in caplog.records:
if not record.getMessage().startswith("Calling `get_updates` one more time"):
continue
assert record.name == "telegram.ext.Updater"
assert record.levelno == logging.DEBUG
log_found = True
break
assert log_found
async def test_polling_mark_updates_as_read_timeout(self, monkeypatch, updater, caplog):
timeout_event = asyncio.Event()
async def get_updates(*args, **kwargs):
await asyncio.sleep(0)
if timeout_event.is_set():
raise TimedOut("TestMessage")
return []
monkeypatch.setattr(updater.bot, "get_updates", get_updates)
async with updater:
await updater.start_polling()
with caplog.at_level(logging.ERROR):
timeout_event.set()
await updater.stop()
assert len(caplog.records) >= 1
log_found = False
for record in caplog.records:
if not record.getMessage().startswith(
"Error while calling `get_updates` one more time"
):
continue
assert record.name == "telegram.ext.Updater"
assert record.exc_info[0] is TimedOut
assert str(record.exc_info[1]) == "TestMessage"
log_found = True
break
assert log_found
async def test_polling_mark_updates_as_read_failure(self, monkeypatch, updater, caplog):
monkeypatch.setattr(updater.bot, "get_updates", empty_get_updates)
async with updater:
await updater.start_polling()
# Unfortunately, there is no clean way to test this scenario as it should in fact
# never happen
updater._Updater__polling_cleanup_cb = None
with caplog.at_level(logging.DEBUG):
await updater.stop()
assert len(caplog.records) >= 1
log_found = False
for record in caplog.records:
if not record.getMessage().startswith("No polling cleanup callback defined"):
continue
assert record.name == "telegram.ext.Updater"
assert record.levelno == logging.WARNING
log_found = True
break
assert log_found
async def test_start_polling_already_running(self, updater, monkeypatch):
monkeypatch.setattr(updater.bot, "get_updates", empty_get_updates)
async with updater:
await updater.start_polling()
task = asyncio.create_task(updater.start_polling())
with pytest.raises(RuntimeError, match="already running"):
await task
await updater.stop()
with pytest.raises(RuntimeError, match="not running"):
await updater.stop()
async def test_start_polling_get_updates_parameters(self, updater, monkeypatch):
update_queue = asyncio.Queue()
await update_queue.put(Update(update_id=1))
on_stop_flag = False
2023-03-25 19:18:04 +01:00
expected = {
"timeout": 10,
"read_timeout": DEFAULT_NONE,
2023-03-25 19:18:04 +01:00
"write_timeout": DEFAULT_NONE,
"connect_timeout": DEFAULT_NONE,
"pool_timeout": DEFAULT_NONE,
"allowed_updates": None,
"api_kwargs": None,
}
async def get_updates(*args, **kwargs):
if on_stop_flag:
# This is tested in test_polling_mark_updates_as_read
await asyncio.sleep(0)
return []
for key, value in expected.items():
assert kwargs.pop(key, None) == value
offset = kwargs.pop("offset", None)
# Check that we don't get any unexpected kwargs
assert kwargs == {}
if offset is not None and self.message_count != 0:
assert offset == self.message_count + 1, "get_updates got wrong `offset` parameter"
if not update_queue.empty():
update = await update_queue.get()
self.message_count = update.update_id
update_queue.task_done()
return [update]
await asyncio.sleep(0)
return []
monkeypatch.setattr(updater.bot, "get_updates", get_updates)
async with updater:
await updater.start_polling()
await update_queue.join()
on_stop_flag = True
await updater.stop()
on_stop_flag = False
2023-03-25 19:18:04 +01:00
expected = {
"timeout": 42,
"read_timeout": 43,
"write_timeout": 44,
"connect_timeout": 45,
"pool_timeout": 46,
"allowed_updates": ["message"],
"api_kwargs": None,
}
await update_queue.put(Update(update_id=2))
await updater.start_polling(
timeout=42,
read_timeout=43,
write_timeout=44,
connect_timeout=45,
pool_timeout=46,
allowed_updates=["message"],
)
await update_queue.join()
on_stop_flag = True
await updater.stop()
2023-03-25 19:18:04 +01:00
@pytest.mark.parametrize("exception_class", [InvalidToken, TelegramError])
@pytest.mark.parametrize("retries", [3, 0])
async def test_start_polling_bootstrap_retries(
self, updater, monkeypatch, exception_class, retries
):
async def delete_webhook(*args, **kwargs):
self.message_count += 1
raise exception_class(str(self.message_count))
monkeypatch.setattr(updater.bot, "delete_webhook", delete_webhook)
async with updater:
if exception_class == InvalidToken:
with pytest.raises(InvalidToken, match="1"):
await updater.start_polling(bootstrap_retries=retries)
else:
with pytest.raises(TelegramError, match=str(retries + 1)):
await updater.start_polling(bootstrap_retries=retries)
@pytest.mark.parametrize(
2023-03-25 19:18:04 +01:00
("error", "callback_should_be_called"),
argvalues=[
(TelegramError("TestMessage"), True),
(RetryAfter(1), False),
(TimedOut("TestMessage"), False),
],
ids=("TelegramError", "RetryAfter", "TimedOut"),
)
@pytest.mark.parametrize("custom_error_callback", [True, False])
async def test_start_polling_exceptions_and_error_callback(
self, monkeypatch, updater, error, callback_should_be_called, custom_error_callback, caplog
):
raise_exception = True
get_updates_event = asyncio.Event()
second_get_updates_event = asyncio.Event()
async def get_updates(*args, **kwargs):
# So that the main task has a chance to be called
await asyncio.sleep(0)
if get_updates_event.is_set():
second_get_updates_event.set()
if not raise_exception:
return []
get_updates_event.set()
raise error
2016-08-06 14:47:45 +02:00
monkeypatch.setattr(updater.bot, "get_updates", get_updates)
monkeypatch.setattr(updater.bot, "set_webhook", lambda *args, **kwargs: True)
with pytest.raises(TypeError, match="`error_callback` must not be a coroutine function"):
await updater.start_polling(error_callback=get_updates)
async with updater:
self.err_handler_called = asyncio.Event()
with caplog.at_level(logging.ERROR):
if custom_error_callback:
await updater.start_polling(error_callback=self.error_callback)
else:
await updater.start_polling()
# Also makes sure that the error handler was called
await get_updates_event.wait()
# wait for get_updates to be called a second time - only now we can expect that
# all error handling for the previous call has finished
await second_get_updates_event.wait()
if callback_should_be_called:
# Make sure that the error handler was called
if custom_error_callback:
assert self.received == error
else:
assert len(caplog.records) > 0
2023-04-10 17:01:35 +02:00
assert any(
"Exception happened while polling for updates." in record.getMessage()
2023-04-10 17:01:35 +02:00
and record.name == "telegram.ext.Updater"
for record in caplog.records
)
# Make sure that get_updates was called
assert get_updates_event.is_set()
# Make sure that Updater polling keeps running
self.err_handler_called.clear()
get_updates_event.clear()
caplog.clear()
# Also makes sure that the error handler was called
await get_updates_event.wait()
if callback_should_be_called:
2023-03-25 19:18:04 +01:00
if custom_error_callback:
assert self.received == error
else:
assert len(caplog.records) > 0
2023-04-10 17:01:35 +02:00
assert any(
"Exception happened while polling for updates." in record.getMessage()
2023-04-10 17:01:35 +02:00
and record.name == "telegram.ext.Updater"
for record in caplog.records
)
raise_exception = False
await updater.stop()
async def test_start_polling_unexpected_shutdown(self, updater, monkeypatch, caplog):
update_queue = asyncio.Queue()
await update_queue.put(Update(update_id=1))
first_update_event = asyncio.Event()
second_update_event = asyncio.Event()
async def get_updates(*args, **kwargs):
self.message_count = kwargs.get("offset")
update = await update_queue.get()
first_update_event.set()
await second_update_event.wait()
return [update]
monkeypatch.setattr(updater.bot, "get_updates", get_updates)
async with updater:
with caplog.at_level(logging.ERROR):
await updater.start_polling()
await first_update_event.wait()
# Unfortunately we need to use the private attribute here to produce the problem
updater._running = False
second_update_event.set()
await asyncio.sleep(1)
assert caplog.records
2023-04-10 17:01:35 +02:00
assert any(
"Updater stopped unexpectedly." in record.getMessage()
and record.name == "telegram.ext.Updater"
for record in caplog.records
)
# Make sure that the update_id offset wasn't increased
assert self.message_count < 1
async def test_start_polling_not_running_after_failure(self, updater, monkeypatch):
# Unfortunately we have to use some internal logic to trigger an exception
async def _start_polling(*args, **kwargs):
raise Exception("Test Exception")
monkeypatch.setattr(Updater, "_start_polling", _start_polling)
async with updater:
with pytest.raises(Exception, match="Test Exception"):
await updater.start_polling()
assert updater.running is False
async def test_polling_update_de_json_fails(self, monkeypatch, updater, caplog):
updates = asyncio.Queue()
raise_exception = True
await updates.put(Update(update_id=1))
async def get_updates(*args, **kwargs):
if raise_exception:
await asyncio.sleep(0.01)
raise TypeError("Invalid Data")
if not updates.empty():
next_update = await updates.get()
updates.task_done()
return [next_update]
await asyncio.sleep(0)
return []
orig_del_webhook = updater.bot.delete_webhook
async def delete_webhook(*args, **kwargs):
# Dropping pending updates is done by passing the parameter to delete_webhook
if kwargs.get("drop_pending_updates"):
self.message_count += 1
return await orig_del_webhook(*args, **kwargs)
monkeypatch.setattr(updater.bot, "get_updates", get_updates)
monkeypatch.setattr(updater.bot, "delete_webhook", delete_webhook)
async with updater:
with caplog.at_level(logging.CRITICAL):
await updater.start_polling()
assert updater.running
await asyncio.sleep(1)
assert len(caplog.records) > 0
for record in caplog.records:
assert record.getMessage().startswith("Something went wrong processing")
2023-04-10 17:01:35 +02:00
assert record.name == "telegram.ext.Updater"
# Make sure that everything works fine again when receiving proper updates
raise_exception = False
await asyncio.sleep(0.5)
caplog.clear()
with caplog.at_level(logging.CRITICAL):
await updates.join()
assert len(caplog.records) == 0
await updater.stop()
assert not updater.running
2016-08-06 14:47:45 +02:00
@pytest.mark.parametrize("ext_bot", [True, False])
2023-03-25 19:18:04 +01:00
@pytest.mark.parametrize("drop_pending_updates", [True, False])
@pytest.mark.parametrize("secret_token", ["SecretToken", None])
@pytest.mark.parametrize(
"unix", [None, "file_path", "socket_object"] if UNIX_AVAILABLE else [None]
)
async def test_webhook_basic(
self,
monkeypatch,
updater,
drop_pending_updates,
ext_bot,
secret_token,
unix,
file_path,
one_time_bot,
one_time_raw_bot,
):
2021-06-06 11:48:48 +02:00
# Testing with both ExtBot and Bot to make sure any logic in WebhookHandler
# that depends on this distinction works
if ext_bot and not isinstance(updater.bot, ExtBot):
updater.bot = one_time_bot
if not ext_bot and type(updater.bot) is not Bot:
updater.bot = one_time_raw_bot
2021-06-06 11:48:48 +02:00
async def delete_webhook(*args, **kwargs):
# Dropping pending updates is done by passing the parameter to delete_webhook
if kwargs.get("drop_pending_updates"):
self.message_count += 1
return True
monkeypatch.setattr(updater.bot, "set_webhook", return_true)
monkeypatch.setattr(updater.bot, "delete_webhook", delete_webhook)
ip = "127.0.0.1"
port = randrange(1024, 49152) # Select random port
async with updater:
if unix:
socket = file_path if unix == "file_path" else bind_unix_socket(file_path)
return_value = await updater.start_webhook(
drop_pending_updates=drop_pending_updates,
secret_token=secret_token,
url_path="TOKEN",
unix=socket,
webhook_url="string",
)
else:
return_value = await updater.start_webhook(
drop_pending_updates=drop_pending_updates,
ip_address=ip,
port=port,
url_path="TOKEN",
secret_token=secret_token,
webhook_url="string",
)
assert return_value is updater.update_queue
assert updater.running
# Now, we send an update to the server
update = make_message_update("Webhook")
await send_webhook_message(
ip,
port,
update.to_json(),
"TOKEN",
secret_token=secret_token,
unix=file_path if unix else None,
)
assert (await updater.update_queue.get()).to_dict() == update.to_dict()
# Returns Not Found if path is incorrect
response = await send_webhook_message(
ip,
port,
"123456",
"webhook_handler.py",
unix=file_path if unix else None,
)
assert response.status_code == HTTPStatus.NOT_FOUND
# Returns METHOD_NOT_ALLOWED if method is not allowed
response = await send_webhook_message(
ip,
port,
None,
"TOKEN",
get_method="HEAD",
unix=file_path if unix else None,
)
assert response.status_code == HTTPStatus.METHOD_NOT_ALLOWED
if secret_token:
# Returns Forbidden if no secret token is set
response = await send_webhook_message(
ip,
port,
update.to_json(),
"TOKEN",
unix=file_path if unix else None,
)
assert response.status_code == HTTPStatus.FORBIDDEN
assert response.text == self.response_text.format(
"Request did not include the secret token", HTTPStatus.FORBIDDEN
)
# Returns Forbidden if the secret token is wrong
response = await send_webhook_message(
ip,
port,
update.to_json(),
"TOKEN",
secret_token="NotTheSecretToken",
unix=file_path if unix else None,
)
assert response.status_code == HTTPStatus.FORBIDDEN
assert response.text == self.response_text.format(
"Request had the wrong secret token", HTTPStatus.FORBIDDEN
)
await updater.stop()
assert not updater.running
if drop_pending_updates:
assert self.message_count == 1
else:
assert self.message_count == 0
# We call the same logic twice to make sure that restarting the updater works as well
if unix:
socket = file_path if unix == "file_path" else bind_unix_socket(file_path)
await updater.start_webhook(
drop_pending_updates=drop_pending_updates,
secret_token=secret_token,
unix=socket,
webhook_url="string",
)
else:
await updater.start_webhook(
drop_pending_updates=drop_pending_updates,
ip_address=ip,
port=port,
url_path="TOKEN",
secret_token=secret_token,
webhook_url="string",
)
assert updater.running
update = make_message_update("Webhook")
await send_webhook_message(
ip,
port,
update.to_json(),
"" if unix else "TOKEN",
secret_token=secret_token,
unix=file_path if unix else None,
)
assert (await updater.update_queue.get()).to_dict() == update.to_dict()
await updater.stop()
assert not updater.running
async def test_unix_webhook_mutually_exclusive_params(self, updater):
async with updater:
with pytest.raises(RuntimeError, match="You can not pass unix and listen"):
await updater.start_webhook(listen="127.0.0.1", unix="DoesntMatter")
with pytest.raises(RuntimeError, match="You can not pass unix and port"):
await updater.start_webhook(port=20, unix="DoesntMatter")
with pytest.raises(RuntimeError, match="you set unix, you also need to set the URL"):
await updater.start_webhook(unix="DoesntMatter")
@pytest.mark.skipif(
platform.system() != "Windows",
reason="Windows is the only platform without unix",
)
async def test_no_unix(self, updater):
async with updater:
with pytest.raises(RuntimeError, match="binding unix sockets."):
await updater.start_webhook(unix="DoesntMatter", webhook_url="TOKEN")
async def test_start_webhook_already_running(self, updater, monkeypatch):
ip = "127.0.0.1"
port = randrange(1024, 49152) # Select random port
async with updater:
await updater.start_webhook(ip, port, url_path="TOKEN")
task = asyncio.create_task(updater.start_webhook(ip, port, url_path="TOKEN"))
with pytest.raises(RuntimeError, match="already running"):
await task
await updater.stop()
with pytest.raises(RuntimeError, match="not running"):
await updater.stop()
async def test_start_webhook_parameters_passing(self, updater, monkeypatch):
2023-03-25 19:18:04 +01:00
expected_delete_webhook = {
"drop_pending_updates": None,
}
expected_set_webhook = dict(
certificate=None,
max_connections=40,
allowed_updates=None,
ip_address=None,
secret_token=None,
**expected_delete_webhook,
)
async def set_webhook(*args, **kwargs):
for key, value in expected_set_webhook.items():
assert kwargs.pop(key, None) == value, f"set, {key}, {value}"
assert kwargs in (
{"url": "http://127.0.0.1:80/"},
{"url": "http://listen:80/"},
{"url": "https://listen-ssl:42/ssl-path"},
)
return True
async def delete_webhook(*args, **kwargs):
for key, value in expected_delete_webhook.items():
assert kwargs.pop(key, None) == value, f"delete, {key}, {value}"
assert kwargs == {}
return True
async def serve_forever(*args, **kwargs):
kwargs.get("ready").set()
monkeypatch.setattr(updater.bot, "set_webhook", set_webhook)
monkeypatch.setattr(updater.bot, "delete_webhook", delete_webhook)
monkeypatch.setattr(WebhookServer, "serve_forever", serve_forever)
async with updater:
await updater.start_webhook()
await updater.stop()
2023-03-25 19:18:04 +01:00
expected_delete_webhook = {
"drop_pending_updates": True,
"api_kwargs": None,
}
expected_set_webhook = dict(
certificate=data_file("sslcert.pem").read_bytes(),
max_connections=47,
allowed_updates=["message"],
ip_address="123.456.789",
secret_token=None,
**expected_delete_webhook,
)
await updater.start_webhook(
listen="listen",
allowed_updates=["message"],
drop_pending_updates=True,
ip_address="123.456.789",
max_connections=47,
cert=str(data_file("sslcert.pem").resolve()),
)
await updater.stop()
await updater.start_webhook(
listen="listen-ssl",
port=42,
url_path="ssl-path",
allowed_updates=["message"],
drop_pending_updates=True,
ip_address="123.456.789",
max_connections=47,
cert=data_file("sslcert.pem"),
key=data_file("sslcert.key"),
)
await updater.stop()
@pytest.mark.parametrize("invalid_data", [True, False], ids=("invalid data", "valid data"))
async def test_webhook_arbitrary_callback_data(
2022-10-07 10:18:08 +02:00
self, monkeypatch, cdc_bot, invalid_data, chat_id
):
2021-06-06 11:48:48 +02:00
"""Here we only test one simple setup. telegram.ext.ExtBot.insert_callback_data is tested
extensively in test_bot.py in conjunction with get_updates."""
2022-10-07 10:18:08 +02:00
updater = Updater(bot=cdc_bot, update_queue=asyncio.Queue())
monkeypatch.setattr(updater.bot, "set_webhook", return_true)
2021-06-06 11:48:48 +02:00
try:
ip = "127.0.0.1"
2021-06-06 11:48:48 +02:00
port = randrange(1024, 49152) # Select random port
async with updater:
await updater.start_webhook(ip, port, url_path="TOKEN")
# Now, we send an update to the server
2021-06-06 11:48:48 +02:00
reply_markup = InlineKeyboardMarkup.from_button(
InlineKeyboardButton(text="text", callback_data="callback_data")
2021-06-06 11:48:48 +02:00
)
if not invalid_data:
reply_markup = updater.bot.callback_data_cache.process_keyboard(reply_markup)
update = make_message_update(
message="test_webhook_arbitrary_callback_data",
message_factory=make_message,
2021-06-06 11:48:48 +02:00
reply_markup=reply_markup,
user=updater.bot.bot,
2021-06-06 11:48:48 +02:00
)
await send_webhook_message(ip, port, update.to_json(), "TOKEN")
received_update = await updater.update_queue.get()
assert received_update.update_id == update.update_id
message_dict = update.message.to_dict()
received_dict = received_update.message.to_dict()
message_dict.pop("reply_markup")
received_dict.pop("reply_markup")
assert message_dict == received_dict
2021-06-06 11:48:48 +02:00
button = received_update.message.reply_markup.inline_keyboard[0][0]
if invalid_data:
assert isinstance(button.callback_data, InvalidCallbackData)
else:
assert button.callback_data == "callback_data"
2021-06-06 11:48:48 +02:00
await updater.stop()
2021-06-06 11:48:48 +02:00
finally:
updater.bot.callback_data_cache.clear_callback_data()
updater.bot.callback_data_cache.clear_callback_queries()
async def test_webhook_invalid_ssl(self, monkeypatch, updater):
ip = "127.0.0.1"
port = randrange(1024, 49152) # Select random port
async with updater:
with pytest.raises(TelegramError, match="Invalid SSL"):
await updater.start_webhook(
ip,
port,
url_path="TOKEN",
cert=Path(__file__).as_posix(),
key=Path(__file__).as_posix(),
bootstrap_retries=0,
drop_pending_updates=False,
webhook_url=None,
allowed_updates=None,
)
assert updater.running is False
async def test_webhook_ssl_just_for_telegram(self, monkeypatch, updater):
"""Here we just test that the SSL info is pased to Telegram, but __not__ to the
webhook server"""
async def set_webhook(**kwargs):
self.test_flag.append(bool(kwargs.get("certificate")))
return True
orig_wh_server_init = WebhookServer.__init__
def webhook_server_init(*args, **kwargs):
self.test_flag = [kwargs.get("ssl_ctx") is None]
orig_wh_server_init(*args, **kwargs)
monkeypatch.setattr(updater.bot, "set_webhook", set_webhook)
monkeypatch.setattr(
"telegram.ext._utils.webhookhandler.WebhookServer.__init__", webhook_server_init
)
ip = "127.0.0.1"
port = randrange(1024, 49152) # Select random port
async with updater:
await updater.start_webhook(ip, port, webhook_url=None, cert=Path(__file__).as_posix())
# Now, we send an update to the server
update = make_message_update(message="test_message")
await send_webhook_message(ip, port, update.to_json())
assert (await updater.update_queue.get()).to_dict() == update.to_dict()
assert self.test_flag == [True, True]
await updater.stop()
2023-03-25 19:18:04 +01:00
@pytest.mark.parametrize("exception_class", [InvalidToken, TelegramError])
@pytest.mark.parametrize("retries", [3, 0])
async def test_start_webhook_bootstrap_retries(
self, updater, monkeypatch, exception_class, retries
):
async def set_webhook(*args, **kwargs):
self.message_count += 1
raise exception_class(str(self.message_count))
monkeypatch.setattr(updater.bot, "set_webhook", set_webhook)
async with updater:
if exception_class == InvalidToken:
with pytest.raises(InvalidToken, match="1"):
await updater.start_webhook(bootstrap_retries=retries)
else:
with pytest.raises(TelegramError, match=str(retries + 1)):
await updater.start_webhook(
bootstrap_retries=retries,
)
async def test_webhook_invalid_posts(self, updater, monkeypatch):
ip = "127.0.0.1"
port = randrange(1024, 49152)
async with updater:
await updater.start_webhook(listen=ip, port=port)
response = await send_webhook_message(ip, port, None, content_type="invalid")
assert response.status_code == HTTPStatus.FORBIDDEN
2016-03-14 00:36:01 +01:00
response = await send_webhook_message(
ip,
port,
payload_str="<root><bla>data</bla></root>",
content_type="application/xml",
)
assert response.status_code == HTTPStatus.FORBIDDEN
response = await send_webhook_message(ip, port, "dummy-payload", content_len=None)
assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR
# httpx already complains about bad content length in _send_webhook_message
# before the requests below reach the webhook, but not testing this is probably
# okay
# response = await send_webhook_message(
# ip, port, 'dummy-payload', content_len=-2)
# assert response.status_code == HTTPStatus.FORBIDDEN
# response = await send_webhook_message(
# ip, port, 'dummy-payload', content_len='not-a-number')
# assert response.status_code == HTTPStatus.INTERNAL_SERVER_ERROR
await updater.stop()
async def test_webhook_update_de_json_fails(self, monkeypatch, updater, caplog):
def de_json_fails(*args, **kwargs):
raise TypeError("Invalid input")
Add test for clean argument of Updater.start_polling/webhook (#2002) * added test for 'clean' argument passed to 'start_polling()' * remove TODO * prettify * remove bool from func name * improve name-ing of fake update func * cleanup class and nameing * replace while for for * swap valueerror for runtimeerror * remove all other code to reduce testing * add comments * don't raise error, complete cycle and assert * remove inf loop protection * Revert "remove all other code to reduce testing" This reverts commit 4566a1debd659ce831a52085867c5a74d1575262. * remove error parametrization * remove comment * remove pass from class * rename update_id to offset as the original get_updates() takes argument offset (which is the update_id) * rename test func to match original func * fix comment * shorten for loop * mock get_updates() behavior when 'offset' is passed. Assert with get_updates() * remove other functions to reduce testing * replicate original get_updates() * move fakeupdate class and list creation outside get_updates and store in var * loop from 0 to make update_id consistant w array key, just easier to debug * update comments * Revert "remove other functions to reduce testing" This reverts commit 1fb498a6ccb619a745b745d61b8345bf1b29b6ba. * fix typo * Revert "fix typo" This reverts commit ade9fec609d7262051b24d17fa6b5b1cf579d760. * Revert "Revert "remove other functions to reduce testing"" This reverts commit 734de1371cfddbd003ceb3c4498e837bab55bd05. * Revert "update comments" This reverts commit f3a032e75eee8d2416a236208b71c35ccc237345. * Revert "loop from 0 to make update_id consistant w array key, just easier to debug" This reverts commit 0c6881d8a1066d5762131d28fb350eaeb92b341c. * Revert "move fakeupdate class and list creation outside get_updates and store in var" This reverts commit 71de999300f052afd6b0ffe2914bfdfc201b6b11. * Revert "replicate original get_updates()" This reverts commit 5d0710ac3a8e6f6ddf628b1ad1d53bfa219decdc. * Revert "remove other functions to reduce testing" This reverts commit 1fb498a6ccb619a745b745d61b8345bf1b29b6ba. * Revert "mock get_updates() behavior when 'offset' is passed. Assert with get_updates()" This reverts commit 8c727ba1e814871a59d7ae66d3dcf411847330d2. * loop from 0 to make update_id consistant w array key, for consitency Co-authored-by: ikkemaniac <ikkemaniac@localhost>
2020-06-24 00:25:58 +02:00
orig_de_json = Update.de_json
monkeypatch.setattr(Update, "de_json", de_json_fails)
Add test for clean argument of Updater.start_polling/webhook (#2002) * added test for 'clean' argument passed to 'start_polling()' * remove TODO * prettify * remove bool from func name * improve name-ing of fake update func * cleanup class and nameing * replace while for for * swap valueerror for runtimeerror * remove all other code to reduce testing * add comments * don't raise error, complete cycle and assert * remove inf loop protection * Revert "remove all other code to reduce testing" This reverts commit 4566a1debd659ce831a52085867c5a74d1575262. * remove error parametrization * remove comment * remove pass from class * rename update_id to offset as the original get_updates() takes argument offset (which is the update_id) * rename test func to match original func * fix comment * shorten for loop * mock get_updates() behavior when 'offset' is passed. Assert with get_updates() * remove other functions to reduce testing * replicate original get_updates() * move fakeupdate class and list creation outside get_updates and store in var * loop from 0 to make update_id consistant w array key, just easier to debug * update comments * Revert "remove other functions to reduce testing" This reverts commit 1fb498a6ccb619a745b745d61b8345bf1b29b6ba. * fix typo * Revert "fix typo" This reverts commit ade9fec609d7262051b24d17fa6b5b1cf579d760. * Revert "Revert "remove other functions to reduce testing"" This reverts commit 734de1371cfddbd003ceb3c4498e837bab55bd05. * Revert "update comments" This reverts commit f3a032e75eee8d2416a236208b71c35ccc237345. * Revert "loop from 0 to make update_id consistant w array key, just easier to debug" This reverts commit 0c6881d8a1066d5762131d28fb350eaeb92b341c. * Revert "move fakeupdate class and list creation outside get_updates and store in var" This reverts commit 71de999300f052afd6b0ffe2914bfdfc201b6b11. * Revert "replicate original get_updates()" This reverts commit 5d0710ac3a8e6f6ddf628b1ad1d53bfa219decdc. * Revert "remove other functions to reduce testing" This reverts commit 1fb498a6ccb619a745b745d61b8345bf1b29b6ba. * Revert "mock get_updates() behavior when 'offset' is passed. Assert with get_updates()" This reverts commit 8c727ba1e814871a59d7ae66d3dcf411847330d2. * loop from 0 to make update_id consistant w array key, for consitency Co-authored-by: ikkemaniac <ikkemaniac@localhost>
2020-06-24 00:25:58 +02:00
ip = "127.0.0.1"
port = randrange(1024, 49152) # Select random port
async with updater:
return_value = await updater.start_webhook(
ip_address=ip,
port=port,
url_path="TOKEN",
)
assert return_value is updater.update_queue
assert updater.running
# Now, we send an update to the server
update = make_message_update("Webhook")
with caplog.at_level(logging.CRITICAL):
response = await send_webhook_message(ip, port, update.to_json(), "TOKEN")
assert len(caplog.records) == 1
assert caplog.records[-1].getMessage().startswith("Something went wrong processing")
assert "Received data was: {" in caplog.records[-1].getMessage()
2023-04-10 17:01:35 +02:00
assert caplog.records[-1].name == "telegram.ext.Updater"
assert response.status_code == 400
assert response.text == self.response_text.format(
"Update could not be processed", HTTPStatus.BAD_REQUEST
)
# Make sure that everything works fine again when receiving proper updates
caplog.clear()
with caplog.at_level(logging.CRITICAL):
monkeypatch.setattr(Update, "de_json", orig_de_json)
await send_webhook_message(ip, port, update.to_json(), "TOKEN")
assert (await updater.update_queue.get()).to_dict() == update.to_dict()
assert len(caplog.records) == 0
await updater.stop()
assert not updater.running