From a6bf4566459e035b521722bc98bf1f19cee131bc Mon Sep 17 00:00:00 2001 From: Noam Meltzer Date: Fri, 2 Mar 2018 23:11:16 +0200 Subject: [PATCH] Updater improvements (#1018) - Refactor bootstrap phase to be resilient for network errors - Retry bootstrap phase indefinitely (by default) on network errors - Improved logs - Improved unitests for polling updater fixes #605 --- telegram/ext/updater.py | 174 +++++++++++++++++++++++++--------------- tests/test_updater.py | 90 ++++++++++++++++++--- 2 files changed, 187 insertions(+), 77 deletions(-) diff --git a/telegram/ext/updater.py b/telegram/ext/updater.py index c711d1be7..77752ba17 100644 --- a/telegram/ext/updater.py +++ b/telegram/ext/updater.py @@ -149,7 +149,7 @@ class Updater(object): target(*args, **kwargs) except Exception: self.__exception_event.set() - self.logger.exception('unhandled exception') + self.logger.exception('unhandled exception in %s', thr_name) raise self.logger.debug('{0} - ended'.format(thr_name)) @@ -157,7 +157,7 @@ class Updater(object): poll_interval=0.0, timeout=10, clean=False, - bootstrap_retries=0, + bootstrap_retries=-1, read_latency=2., allowed_updates=None): """Starts polling updates from Telegram. @@ -171,8 +171,8 @@ class Updater(object): bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the `Updater` will retry on failures on the Telegram server. - * < 0 - retry indefinitely - * 0 - no retries (default) + * < 0 - retry indefinitely (default) + * 0 - no retries * > 0 - retry up to X times allowed_updates (List[:obj:`str`], optional): Passed to @@ -229,8 +229,8 @@ class Updater(object): bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the `Updater` will retry on failures on the Telegram server. - * < 0 - retry indefinitely - * 0 - no retries (default) + * < 0 - retry indefinitely (default) + * 0 - no retries * > 0 - retry up to X times webhook_url (:obj:`str`, optional): Explicitly specify the webhook url. Useful behind @@ -242,7 +242,6 @@ class Updater(object): :obj:`Queue`: The update queue that can be filled from the main thread. """ - with self.__lock: if not self.running: self.running = True @@ -262,46 +261,72 @@ class Updater(object): # updates from Telegram and inserts them in the update queue of the # Dispatcher. - cur_interval = poll_interval - self.logger.debug('Updater thread started') + self.logger.debug('Updater thread started (polling)') self._bootstrap(bootstrap_retries, clean=clean, webhook_url='', allowed_updates=None) - while self.running: - try: - updates = self.bot.get_updates( - self.last_update_id, - timeout=timeout, - read_latency=read_latency, - allowed_updates=allowed_updates) - except RetryAfter as e: - self.logger.info(str(e)) - cur_interval = 0.5 + e.retry_after - except TimedOut as toe: - self.logger.debug('Timed out getting Updates: %s', toe) - # If get_updates() failed due to timeout, we should retry asap. - cur_interval = 0 - except TelegramError as te: - self.logger.error('Error while getting Updates: %s', te) + self.logger.debug('Bootstrap done') - # Put the error into the update queue and let the Dispatcher - # broadcast it - self.update_queue.put(te) + def polling_action_cb(): + updates = self.bot.get_updates( + self.last_update_id, timeout=timeout, read_latency=read_latency, + allowed_updates=allowed_updates) - cur_interval = self._increase_poll_interval(cur_interval) - else: + if updates: if not self.running: - if len(updates) > 0: - self.logger.debug('Updates ignored and will be pulled ' - 'again on restart.') - break - - if updates: + self.logger.debug('Updates ignored and will be pulled again on restart') + else: for update in updates: self.update_queue.put(update) self.last_update_id = updates[-1].update_id + 1 - cur_interval = poll_interval + return True + + def polling_onerr_cb(exc): + # Put the error into the update queue and let the Dispatcher + # broadcast it + self.update_queue.put(exc) + + self._network_loop_retry(polling_action_cb, polling_onerr_cb, 'getting Updates', + poll_interval) + + def _network_loop_retry(self, action_cb, onerr_cb, description, interval): + """Perform a loop calling `action_cb`, retrying after network errors. + + Stop condition for loop: `self.running` evaluates False or return value of `action_cb` + evaluates False. + + Args: + action_cb (:obj:`callable`): Network oriented callback function to call. + onerr_cb (:obj:`callable`): Callback to call when TelegramError is caught. Receives the + exception object as a parameter. + description (:obj:`str`): Description text to use for logs and exception raised. + interval (:obj:`float` | :obj:`int`): Interval to sleep between each call to + `action_cb`. + + """ + self.logger.debug('Start network loop retry %s', description) + cur_interval = interval + while self.running: + try: + if not action_cb(): + break + except RetryAfter as e: + self.logger.info('%s', e) + cur_interval = 0.5 + e.retry_after + except TimedOut as toe: + self.logger.debug('Timed out %s: %s', description, toe) + # If failure is due to timeout, we should retry asap. + cur_interval = 0 + except InvalidToken as pex: + self.logger.error('Invalid token; aborting') + raise pex + except TelegramError as te: + self.logger.error('Error while %s: %s', description, te) + onerr_cb(te) + cur_interval = self._increase_poll_interval(cur_interval) + else: + cur_interval = interval if cur_interval: sleep(cur_interval) @@ -319,7 +344,7 @@ class Updater(object): def _start_webhook(self, listen, port, url_path, cert, key, bootstrap_retries, clean, webhook_url, allowed_updates): - self.logger.debug('Updater thread started') + self.logger.debug('Updater thread started (webhook)') use_ssl = cert is not None and key is not None if not url_path.startswith('/'): url_path = '/{0}'.format(url_path) @@ -370,39 +395,56 @@ class Updater(object): def _gen_webhook_url(listen, port, url_path): return 'https://{listen}:{port}{path}'.format(listen=listen, port=port, path=url_path) - def _bootstrap(self, max_retries, clean, webhook_url, allowed_updates, cert=None): - retries = 0 - while 1: + def _bootstrap(self, max_retries, clean, webhook_url, allowed_updates, cert=None, + bootstrap_interval=5): + retries = [0] - try: - if clean: - # Disable webhook for cleaning - self.bot.delete_webhook() - self._clean_updates() - sleep(1) + def bootstrap_del_webhook(): + self.bot.delete_webhook() + return False - self.bot.set_webhook( - url=webhook_url, certificate=cert, allowed_updates=allowed_updates) - except (Unauthorized, InvalidToken): - raise - except TelegramError: - msg = 'error in bootstrap phase; try={0} max_retries={1}'.format(retries, - max_retries) - if max_retries < 0 or retries < max_retries: - self.logger.warning(msg) - retries += 1 - else: - self.logger.exception(msg) - raise + def bootstrap_clean_updates(): + self.logger.debug('Cleaning updates from Telegram server') + updates = self.bot.get_updates() + while updates: + updates = self.bot.get_updates(updates[-1].update_id + 1) + return False + + def bootstrap_set_webhook(): + self.bot.set_webhook( + url=webhook_url, certificate=cert, allowed_updates=allowed_updates) + return False + + def bootstrap_onerr_cb(exc): + if not isinstance(exc, Unauthorized) and (max_retries < 0 or retries[0] < max_retries): + retries[0] += 1 + self.logger.warning('Failed bootstrap phase; try=%s max_retries=%s', + retries[0], max_retries) else: - break + self.logger.error('Failed bootstrap phase after %s retries (%s)', retries[0], exc) + raise exc + + # Cleaning pending messages is done by polling for them - so we need to delete webhook if + # one is configured. + # We also take this chance to delete pre-configured webhook if this is a polling Updater. + # NOTE: We don't know ahead if a webhook is configured, so we just delete. + if clean or not webhook_url: + self._network_loop_retry(bootstrap_del_webhook, bootstrap_onerr_cb, + 'bootstrap del webhook', bootstrap_interval) + retries[0] = 0 + + # Clean pending messages, if requested. + if clean: + self._network_loop_retry(bootstrap_clean_updates, bootstrap_onerr_cb, + 'bootstrap clean updates', bootstrap_interval) + retries[0] = 0 sleep(1) - def _clean_updates(self): - self.logger.debug('Cleaning updates from Telegram server') - updates = self.bot.get_updates() - while updates: - updates = self.bot.get_updates(updates[-1].update_id + 1) + # Restore/set webhook settings, if needed. Again, we don't know ahead if a webhook is set, + # so we set it anyhow. + if webhook_url: + self._network_loop_retry(bootstrap_set_webhook, bootstrap_onerr_cb, + 'bootstrap set webhook', bootstrap_interval) def stop(self): """Stops the polling/webhook thread, the dispatcher and the job queue.""" diff --git a/tests/test_updater.py b/tests/test_updater.py index 7cec00cce..85c4a6c99 100644 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -23,7 +23,7 @@ import sys from functools import partial from queue import Queue from random import randrange -from threading import Thread +from threading import Thread, Event from time import sleep try: @@ -38,7 +38,7 @@ import pytest from future.builtins import bytes from telegram import TelegramError, Message, User, Chat, Update, Bot -from telegram.error import Unauthorized, InvalidToken +from telegram.error import Unauthorized, InvalidToken, TimedOut, RetryAfter from telegram.ext import Updater signalskip = pytest.mark.skipif(sys.platform == 'win32', @@ -58,31 +58,94 @@ class TestUpdater(object): message_count = 0 received = None attempts = 0 + err_handler_called = Event() + cb_handler_called = Event() @pytest.fixture(autouse=True) def reset(self): self.message_count = 0 self.received = None self.attempts = 0 + self.err_handler_called.clear() + self.cb_handler_called.clear() def error_handler(self, bot, update, error): self.received = error.message + self.err_handler_called.set() def callback(self, bot, update): self.received = update.message.text + self.cb_handler_called.set() - # TODO: test clean= argument + # TODO: test clean= argument of Updater._bootstrap - def test_error_on_get_updates(self, monkeypatch, updater): + @pytest.mark.parametrize(('error',), + argvalues=[(TelegramError('Test Error 2'),), + (Unauthorized('Test Unauthorized'),)], + ids=('TelegramError', 'Unauthorized')) + def test_get_updates_normal_err(self, monkeypatch, updater, error): def test(*args, **kwargs): - raise TelegramError('Test Error 2') + raise error monkeypatch.setattr('telegram.Bot.get_updates', test) monkeypatch.setattr('telegram.Bot.set_webhook', lambda *args, **kwargs: True) updater.dispatcher.add_error_handler(self.error_handler) updater.start_polling(0.01) - sleep(.1) - assert self.received == 'Test Error 2' + + # Make sure that the error handler was called + self.err_handler_called.wait() + assert self.received == error.message + + # Make sure that Updater polling thread keeps running + self.err_handler_called.clear() + self.err_handler_called.wait() + + def test_get_updates_bailout_err(self, monkeypatch, updater, caplog): + error = InvalidToken() + + def test(*args, **kwargs): + raise error + + with caplog.at_level(logging.DEBUG): + monkeypatch.setattr('telegram.Bot.get_updates', test) + monkeypatch.setattr('telegram.Bot.set_webhook', lambda *args, **kwargs: True) + updater.dispatcher.add_error_handler(self.error_handler) + updater.start_polling(0.01) + assert self.err_handler_called.wait(0.5) is not True + + # NOTE: This test might hit a race condition and fail (though the 0.5 seconds delay above + # should work around it). + # NOTE: Checking Updater.running is problematic because it is not set to False when there's + # an unhandled exception. + # TODO: We should have a way to poll Updater status and decide if it's running or not. + assert any('unhandled exception in updater' in rec.getMessage() for rec in + caplog.get_records('call')) + + @pytest.mark.parametrize(('error',), + argvalues=[(RetryAfter(0.01),), + (TimedOut(),)], + ids=('RetryAfter', 'TimedOut')) + def test_get_updates_retries(self, monkeypatch, updater, error): + event = Event() + + def test(*args, **kwargs): + event.set() + raise error + + monkeypatch.setattr('telegram.Bot.get_updates', test) + monkeypatch.setattr('telegram.Bot.set_webhook', lambda *args, **kwargs: True) + updater.dispatcher.add_error_handler(self.error_handler) + updater.start_polling(0.01) + + # Make sure that get_updates was called, but not the error handler + event.wait() + assert self.err_handler_called.wait(0.5) is not True + assert self.received != error.message + + # Make sure that Updater polling thread keeps running + event.clear() + event.wait() + assert self.err_handler_called.wait(0.5) is not True def test_webhook(self, monkeypatch, updater): q = Queue() @@ -145,17 +208,21 @@ class TestUpdater(object): sleep(.2) assert q.get(False) == update - def test_bootstrap_retries_success(self, monkeypatch, updater): + @pytest.mark.parametrize(('error',), + argvalues=[(TelegramError(''),)], + ids=('TelegramError',)) + def test_bootstrap_retries_success(self, monkeypatch, updater, error): retries = 2 def attempt(_, *args, **kwargs): if self.attempts < retries: self.attempts += 1 - raise TelegramError('') + raise error monkeypatch.setattr('telegram.Bot.set_webhook', attempt) - updater._bootstrap(retries, False, 'path', None) + updater.running = True + updater._bootstrap(retries, False, 'path', None, bootstrap_interval=0) assert self.attempts == retries @pytest.mark.parametrize(('error', 'attempts'), @@ -172,8 +239,9 @@ class TestUpdater(object): monkeypatch.setattr('telegram.Bot.set_webhook', attempt) + updater.running = True with pytest.raises(type(error)): - updater._bootstrap(retries, False, 'path', None) + updater._bootstrap(retries, False, 'path', None, bootstrap_interval=0) assert self.attempts == attempts def test_webhook_invalid_posts(self, updater):