Improve Updater.set_webhook (#2419)

* Get started

* tests

* Some doc fixes

* Some doc fixes
This commit is contained in:
Bibo-Joshi 2021-03-13 15:35:26 +01:00 committed by GitHub
parent 038a3b4452
commit aba17cb997
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 207 additions and 96 deletions

View file

@ -244,36 +244,56 @@ class Updater:
self,
poll_interval: float = 0.0,
timeout: float = 10,
clean: bool = False,
clean: bool = None,
bootstrap_retries: int = -1,
read_latency: float = 2.0,
allowed_updates: List[str] = None,
drop_pending_updates: bool = None,
) -> Optional[Queue]:
"""Starts polling updates from Telegram.
Args:
poll_interval (:obj:`float`, optional): Time to wait between polling updates from
Telegram in seconds. Default is 0.0.
timeout (:obj:`float`, optional): Passed to :attr:`telegram.Bot.get_updates`.
clean (:obj:`bool`, optional): Whether to clean any pending updates on Telegram servers
before actually starting to poll. Default is :obj:`False`.
timeout (:obj:`float`, optional): Passed to :meth:`telegram.Bot.get_updates`.
drop_pending_updates (:obj:`bool`, optional): Whether to clean any pending updates on
Telegram servers before actually starting to poll. Default is :obj:`False`.
.. versionadded :: 13.4
clean (:obj:`bool`, optional): Alias for ``drop_pending_updates``.
.. deprecated:: 13.4
Use ``drop_pending_updates`` instead.
bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the
`Updater` will retry on failures on the Telegram server.
:class:`telegram.ext.Updater` will retry on failures on the Telegram server.
* < 0 - retry indefinitely (default)
* 0 - no retries
* > 0 - retry up to X times
allowed_updates (List[:obj:`str`], optional): Passed to
:attr:`telegram.Bot.get_updates`.
:meth:`telegram.Bot.get_updates`.
read_latency (:obj:`float` | :obj:`int`, optional): Grace time in seconds for receiving
the reply from server. Will be added to the `timeout` value and used as the read
the reply from server. Will be added to the ``timeout`` value and used as the read
timeout from server (Default: 2).
Returns:
:obj:`Queue`: The update queue that can be filled from the main thread.
"""
if (clean is not None) and (drop_pending_updates is not None):
raise TypeError('`clean` and `drop_pending_updates` are mutually exclusive.')
if clean is not None:
warnings.warn(
'The argument `clean` of `start_polling` is deprecated. Please use '
'`drop_pending_updates` instead.',
category=TelegramDeprecationWarning,
stacklevel=2,
)
drop_pending_updates = drop_pending_updates if drop_pending_updates is not None else clean
with self.__lock:
if not self.running:
self.running = True
@ -290,7 +310,7 @@ class Updater:
timeout,
read_latency,
bootstrap_retries,
clean,
drop_pending_updates,
allowed_updates,
ready=polling_ready,
)
@ -310,18 +330,20 @@ class Updater:
url_path: str = '',
cert: str = None,
key: str = None,
clean: bool = False,
clean: bool = None,
bootstrap_retries: int = 0,
webhook_url: str = None,
allowed_updates: List[str] = None,
force_event_loop: bool = False,
drop_pending_updates: bool = None,
ip_address: str = None,
) -> Optional[Queue]:
"""
Starts a small http server to listen for updates via webhook. If cert
and key are not provided, the webhook will be started directly on
http://listen:port/url_path, so SSL can be handled by another
application. Else, the webhook will be started on
https://listen:port/url_path
https://listen:port/url_path. Also calls :meth:`telegram.Bot.set_webhook` as required.
Note:
Due to an incompatibility of the Tornado library PTB uses for the webhook with Python
@ -338,19 +360,29 @@ class Updater:
url_path (:obj:`str`, optional): Path inside url.
cert (:obj:`str`, optional): Path to the SSL certificate file.
key (:obj:`str`, optional): Path to the SSL key file.
clean (:obj:`bool`, optional): Whether to clean any pending updates on Telegram servers
before actually starting the webhook. Default is :obj:`False`.
drop_pending_updates (:obj:`bool`, optional): Whether to clean any pending updates on
Telegram servers before actually starting to poll. Default is :obj:`False`.
.. versionadded :: 13.4
clean (:obj:`bool`, optional): Alias for ``drop_pending_updates``.
.. deprecated:: 13.4
Use ``drop_pending_updates`` instead.
bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the
`Updater` will retry on failures on the Telegram server.
:class:`telegram.ext.Updater` will retry on failures on the Telegram server.
* < 0 - retry indefinitely (default)
* 0 - no retries
* > 0 - retry up to X times
webhook_url (:obj:`str`, optional): Explicitly specify the webhook url. Useful behind
NAT, reverse proxy, etc. Default is derived from `listen`, `port` & `url_path`.
NAT, reverse proxy, etc. Default is derived from ``listen``, ``port`` &
``url_path``.
ip_address (:obj:`str`, optional): Passed to :meth:`telegram.Bot.set_webhook`.
.. versionadded :: 13.4
allowed_updates (List[:obj:`str`], optional): Passed to
:attr:`telegram.Bot.set_webhook`.
:meth:`telegram.Bot.set_webhook`.
force_event_loop (:obj:`bool`, optional): Force using the current event loop. See above
note for details. Defaults to :obj:`False`
@ -358,6 +390,19 @@ class Updater:
:obj:`Queue`: The update queue that can be filled from the main thread.
"""
if (clean is not None) and (drop_pending_updates is not None):
raise TypeError('`clean` and `drop_pending_updates` are mutually exclusive.')
if clean is not None:
warnings.warn(
'The argument `clean` of `start_webhook` is deprecated. Please use '
'`drop_pending_updates` instead.',
category=TelegramDeprecationWarning,
stacklevel=2,
)
drop_pending_updates = drop_pending_updates if drop_pending_updates is not None else clean
with self.__lock:
if not self.running:
self.running = True
@ -376,11 +421,12 @@ class Updater:
cert,
key,
bootstrap_retries,
clean,
drop_pending_updates,
webhook_url,
allowed_updates,
ready=webhook_ready,
force_event_loop=force_event_loop,
ip_address=ip_address,
)
self.logger.debug('Waiting for Dispatcher and Webhook to start')
@ -398,7 +444,7 @@ class Updater:
timeout,
read_latency,
bootstrap_retries,
clean,
drop_pending_updates,
allowed_updates,
ready=None,
): # pragma: no cover
@ -408,7 +454,12 @@ class Updater:
self.logger.debug('Updater thread started (polling)')
self._bootstrap(bootstrap_retries, clean=clean, webhook_url='', allowed_updates=None)
self._bootstrap(
bootstrap_retries,
drop_pending_updates=drop_pending_updates,
webhook_url='',
allowed_updates=None,
)
self.logger.debug('Bootstrap done')
@ -504,14 +555,20 @@ class Updater:
cert,
key,
bootstrap_retries,
clean,
drop_pending_updates,
webhook_url,
allowed_updates,
ready=None,
force_event_loop=False,
ip_address=None,
):
self.logger.debug('Updater thread started (webhook)')
# Note that we only use the SSL certificate for the WebhookServer, if the key is also
# present. This is because the WebhookServer may not actually be in charge of performing
# the SSL handshake, e.g. in case a reverse proxy is used
use_ssl = cert is not None and key is not None
if not url_path.startswith('/'):
url_path = f'/{url_path}'
@ -532,23 +589,18 @@ class Updater:
# Create and start server
self.httpd = WebhookServer(listen, port, app, ssl_ctx)
if use_ssl:
# DO NOT CHANGE: Only set webhook if SSL is handled by library
if not webhook_url:
webhook_url = self._gen_webhook_url(listen, port, url_path)
if not webhook_url:
webhook_url = self._gen_webhook_url(listen, port, url_path)
self._bootstrap(
max_retries=bootstrap_retries,
clean=clean,
webhook_url=webhook_url,
cert=open(cert, 'rb'),
allowed_updates=allowed_updates,
)
elif clean:
self.logger.warning(
"cleaning updates is not supported if "
"SSL-termination happens elsewhere; skipping"
)
# We pass along the cert to the webhook if present.
self._bootstrap(
max_retries=bootstrap_retries,
drop_pending_updates=drop_pending_updates,
webhook_url=webhook_url,
allowed_updates=allowed_updates,
cert=open(cert, 'rb') if cert is not None else None,
ip_address=ip_address,
)
self.httpd.serve_forever(force_event_loop=force_event_loop, ready=ready)
@ -558,24 +610,34 @@ class Updater:
@no_type_check
def _bootstrap(
self, max_retries, clean, webhook_url, allowed_updates, cert=None, bootstrap_interval=5
self,
max_retries,
drop_pending_updates,
webhook_url,
allowed_updates,
cert=None,
bootstrap_interval=5,
ip_address=None,
):
retries = [0]
def bootstrap_del_webhook():
self.bot.delete_webhook()
return False
def bootstrap_clean_updates():
self.logger.debug('Cleaning updates from Telegram server')
updates = self.bot.get_updates()
while updates:
updates = self.bot.get_updates(updates[-1].update_id + 1)
self.logger.debug('Deleting webhook')
if drop_pending_updates:
self.logger.debug('Dropping pending updates from Telegram server')
self.bot.delete_webhook(drop_pending_updates=drop_pending_updates)
return False
def bootstrap_set_webhook():
self.logger.debug('Setting webhook')
if drop_pending_updates:
self.logger.debug('Dropping pending updates from Telegram server')
self.bot.set_webhook(
url=webhook_url, certificate=cert, allowed_updates=allowed_updates
url=webhook_url,
certificate=cert,
allowed_updates=allowed_updates,
ip_address=ip_address,
drop_pending_updates=drop_pending_updates,
)
return False
@ -589,11 +651,11 @@ class Updater:
self.logger.error('Failed bootstrap phase after %s retries (%s)', retries[0], exc)
raise exc
# Cleaning pending messages is done by polling for them - so we need to delete webhook if
# one is configured.
# We also take this chance to delete pre-configured webhook if this is a polling Updater.
# NOTE: We don't know ahead if a webhook is configured, so we just delete.
if clean or not webhook_url:
# Dropping pending updates from TG can be efficiently done with the drop_pending_updates
# parameter of delete/start_webhook, even in the case of polling. Also we want to make
# sure that no webhook is configured in case of polling, so we just always call
# delete_webhook for polling
if drop_pending_updates or not webhook_url:
self._network_loop_retry(
bootstrap_del_webhook,
bootstrap_onerr_cb,
@ -602,17 +664,6 @@ class Updater:
)
retries[0] = 0
# Clean pending messages, if requested.
if clean:
self._network_loop_retry(
bootstrap_clean_updates,
bootstrap_onerr_cb,
'bootstrap clean updates',
bootstrap_interval,
)
retries[0] = 0
sleep(1)
# Restore/set webhook settings, if needed. Again, we don't know ahead if a webhook is set,
# so we set it anyhow.
if webhook_url:

View file

@ -72,6 +72,7 @@ class TestUpdater:
err_handler_called = Event()
cb_handler_called = Event()
offset = 0
test_flag = False
@pytest.fixture(autouse=True)
def reset(self):
@ -80,6 +81,7 @@ class TestUpdater:
self.attempts = 0
self.err_handler_called.clear()
self.cb_handler_called.clear()
self.test_flag = False
def error_handler(self, bot, update, error):
self.received = error.message
@ -247,7 +249,7 @@ class TestUpdater:
cert=None,
key=None,
bootstrap_retries=0,
clean=False,
drop_pending_updates=False,
webhook_url=None,
allowed_updates=None,
)
@ -274,7 +276,7 @@ class TestUpdater:
cert=None,
key=None,
bootstrap_retries=0,
clean=False,
drop_pending_updates=False,
webhook_url=None,
allowed_updates=None,
)
@ -307,7 +309,7 @@ class TestUpdater:
cert=None,
key=None,
bootstrap_retries=0,
clean=False,
drop_pending_updates=False,
webhook_url=None,
allowed_updates=None,
force_event_loop=True,
@ -328,7 +330,7 @@ class TestUpdater:
cert='./tests/test_updater.py',
key='./tests/test_updater.py',
bootstrap_retries=0,
clean=False,
drop_pending_updates=False,
webhook_url=None,
allowed_updates=None,
)
@ -357,6 +359,42 @@ class TestUpdater:
assert q.get(False) == update
updater.stop()
def test_webhook_ssl_just_for_telegram(self, monkeypatch, updater):
q = Queue()
def set_webhook(**kwargs):
self.test_flag.append(bool(kwargs.get('certificate')))
return True
orig_wh_server_init = WebhookServer.__init__
def webhook_server_init(*args):
self.test_flag = [args[-1] is None]
orig_wh_server_init(*args)
monkeypatch.setattr(updater.bot, 'set_webhook', set_webhook)
monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True)
monkeypatch.setattr('telegram.ext.Dispatcher.process_update', lambda _, u: q.put(u))
monkeypatch.setattr(
'telegram.ext.utils.webhookhandler.WebhookServer.__init__', webhook_server_init
)
ip = '127.0.0.1'
port = randrange(1024, 49152) # Select random port
updater.start_webhook(ip, port, webhook_url=None, cert='./tests/test_updater.py')
sleep(0.2)
# Now, we send an update to the server via urlopen
update = Update(
1,
message=Message(1, None, Chat(1, ''), from_user=User(1, '', False), text='Webhook 2'),
)
self._send_webhook_msg(ip, port, update.to_json())
sleep(0.2)
assert q.get(False) == update
updater.stop()
assert self.test_flag == [True, True]
@pytest.mark.parametrize(('error',), argvalues=[(TelegramError(''),)], ids=('TelegramError',))
def test_bootstrap_retries_success(self, monkeypatch, updater, error):
retries = 2
@ -391,41 +429,63 @@ class TestUpdater:
updater._bootstrap(retries, False, 'path', None, bootstrap_interval=0)
assert self.attempts == attempts
def test_bootstrap_clean_updates(self, monkeypatch, updater):
clean = True
expected_id = 4
self.offset = 0
@pytest.mark.parametrize('drop_pending_updates', (True, False))
def test_bootstrap_clean_updates(self, monkeypatch, updater, drop_pending_updates):
# As dropping pending updates is done by passing `drop_pending_updates` to
# set_webhook, we just check that we pass the correct value
self.test_flag = False
def get_updates(*args, **kwargs):
# we're hitting this func twice
# 1. no args, return list of updates
# 2. with 1 arg, int => if int == expected_id => test successful
def delete_webhook(**kwargs):
self.test_flag = kwargs.get('drop_pending_updates') == drop_pending_updates
# case 2
# 2nd call from bootstrap____clean
# we should be called with offset = 4
# save value passed in self.offset for assert down below
if len(args) > 0:
self.offset = int(args[0])
return []
class FakeUpdate:
def __init__(self, update_id):
self.update_id = update_id
# case 1
# return list of obj's
# build list of fake updates
# returns list of 4 objects with
# update_id's 0, 1, 2 and 3
return [FakeUpdate(i) for i in range(0, expected_id)]
monkeypatch.setattr(updater.bot, 'get_updates', get_updates)
monkeypatch.setattr(updater.bot, 'delete_webhook', delete_webhook)
updater.running = True
updater._bootstrap(1, clean, None, None, bootstrap_interval=0)
assert self.offset == expected_id
updater._bootstrap(
1,
drop_pending_updates=drop_pending_updates,
webhook_url=None,
allowed_updates=None,
bootstrap_interval=0,
)
assert self.test_flag is True
def test_clean_deprecation_warning_webhook(self, recwarn, updater, monkeypatch):
monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True)
monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True)
# prevent api calls from @info decorator when updater.bot.id is used in thread names
monkeypatch.setattr(updater.bot, '_bot', User(id=123, first_name='bot', is_bot=True))
monkeypatch.setattr(updater.bot, '_commands', [])
ip = '127.0.0.1'
port = randrange(1024, 49152) # Select random port
updater.start_webhook(ip, port, clean=True)
updater.stop()
assert len(recwarn) == 2
assert str(recwarn[0].message).startswith('Old Handler API')
assert str(recwarn[1].message).startswith('The argument `clean` of')
def test_clean_deprecation_warning_polling(self, recwarn, updater, monkeypatch):
monkeypatch.setattr(updater.bot, 'set_webhook', lambda *args, **kwargs: True)
monkeypatch.setattr(updater.bot, 'delete_webhook', lambda *args, **kwargs: True)
# prevent api calls from @info decorator when updater.bot.id is used in thread names
monkeypatch.setattr(updater.bot, '_bot', User(id=123, first_name='bot', is_bot=True))
monkeypatch.setattr(updater.bot, '_commands', [])
updater.start_polling(clean=True)
updater.stop()
assert len(recwarn) == 2
for msg in recwarn:
print(msg)
assert str(recwarn[0].message).startswith('Old Handler API')
assert str(recwarn[1].message).startswith('The argument `clean` of')
def test_clean_drop_pending_mutually_exclusive(self, updater):
with pytest.raises(TypeError, match='`clean` and `drop_pending_updates` are mutually'):
updater.start_polling(clean=True, drop_pending_updates=False)
with pytest.raises(TypeError, match='`clean` and `drop_pending_updates` are mutually'):
updater.start_webhook(clean=True, drop_pending_updates=False)
@flaky(3, 1)
def test_webhook_invalid_posts(self, updater):