Updater improvements (#1018)

- Refactor bootstrap phase to be resilient for network errors
 - Retry bootstrap phase indefinitely (by default) on network errors
 - Improved logs
 - Improved unitests for polling updater

fixes #605
This commit is contained in:
Noam Meltzer 2018-03-02 23:11:16 +02:00 committed by GitHub
parent 811369d1a0
commit a6bf456645
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 187 additions and 77 deletions

View file

@ -149,7 +149,7 @@ class Updater(object):
target(*args, **kwargs)
except Exception:
self.__exception_event.set()
self.logger.exception('unhandled exception')
self.logger.exception('unhandled exception in %s', thr_name)
raise
self.logger.debug('{0} - ended'.format(thr_name))
@ -157,7 +157,7 @@ class Updater(object):
poll_interval=0.0,
timeout=10,
clean=False,
bootstrap_retries=0,
bootstrap_retries=-1,
read_latency=2.,
allowed_updates=None):
"""Starts polling updates from Telegram.
@ -171,8 +171,8 @@ class Updater(object):
bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the
`Updater` will retry on failures on the Telegram server.
* < 0 - retry indefinitely
* 0 - no retries (default)
* < 0 - retry indefinitely (default)
* 0 - no retries
* > 0 - retry up to X times
allowed_updates (List[:obj:`str`], optional): Passed to
@ -229,8 +229,8 @@ class Updater(object):
bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the
`Updater` will retry on failures on the Telegram server.
* < 0 - retry indefinitely
* 0 - no retries (default)
* < 0 - retry indefinitely (default)
* 0 - no retries
* > 0 - retry up to X times
webhook_url (:obj:`str`, optional): Explicitly specify the webhook url. Useful behind
@ -242,7 +242,6 @@ class Updater(object):
:obj:`Queue`: The update queue that can be filled from the main thread.
"""
with self.__lock:
if not self.running:
self.running = True
@ -262,46 +261,72 @@ class Updater(object):
# updates from Telegram and inserts them in the update queue of the
# Dispatcher.
cur_interval = poll_interval
self.logger.debug('Updater thread started')
self.logger.debug('Updater thread started (polling)')
self._bootstrap(bootstrap_retries, clean=clean, webhook_url='', allowed_updates=None)
while self.running:
try:
updates = self.bot.get_updates(
self.last_update_id,
timeout=timeout,
read_latency=read_latency,
allowed_updates=allowed_updates)
except RetryAfter as e:
self.logger.info(str(e))
cur_interval = 0.5 + e.retry_after
except TimedOut as toe:
self.logger.debug('Timed out getting Updates: %s', toe)
# If get_updates() failed due to timeout, we should retry asap.
cur_interval = 0
except TelegramError as te:
self.logger.error('Error while getting Updates: %s', te)
self.logger.debug('Bootstrap done')
# Put the error into the update queue and let the Dispatcher
# broadcast it
self.update_queue.put(te)
def polling_action_cb():
updates = self.bot.get_updates(
self.last_update_id, timeout=timeout, read_latency=read_latency,
allowed_updates=allowed_updates)
cur_interval = self._increase_poll_interval(cur_interval)
else:
if updates:
if not self.running:
if len(updates) > 0:
self.logger.debug('Updates ignored and will be pulled '
'again on restart.')
break
if updates:
self.logger.debug('Updates ignored and will be pulled again on restart')
else:
for update in updates:
self.update_queue.put(update)
self.last_update_id = updates[-1].update_id + 1
cur_interval = poll_interval
return True
def polling_onerr_cb(exc):
# Put the error into the update queue and let the Dispatcher
# broadcast it
self.update_queue.put(exc)
self._network_loop_retry(polling_action_cb, polling_onerr_cb, 'getting Updates',
poll_interval)
def _network_loop_retry(self, action_cb, onerr_cb, description, interval):
"""Perform a loop calling `action_cb`, retrying after network errors.
Stop condition for loop: `self.running` evaluates False or return value of `action_cb`
evaluates False.
Args:
action_cb (:obj:`callable`): Network oriented callback function to call.
onerr_cb (:obj:`callable`): Callback to call when TelegramError is caught. Receives the
exception object as a parameter.
description (:obj:`str`): Description text to use for logs and exception raised.
interval (:obj:`float` | :obj:`int`): Interval to sleep between each call to
`action_cb`.
"""
self.logger.debug('Start network loop retry %s', description)
cur_interval = interval
while self.running:
try:
if not action_cb():
break
except RetryAfter as e:
self.logger.info('%s', e)
cur_interval = 0.5 + e.retry_after
except TimedOut as toe:
self.logger.debug('Timed out %s: %s', description, toe)
# If failure is due to timeout, we should retry asap.
cur_interval = 0
except InvalidToken as pex:
self.logger.error('Invalid token; aborting')
raise pex
except TelegramError as te:
self.logger.error('Error while %s: %s', description, te)
onerr_cb(te)
cur_interval = self._increase_poll_interval(cur_interval)
else:
cur_interval = interval
if cur_interval:
sleep(cur_interval)
@ -319,7 +344,7 @@ class Updater(object):
def _start_webhook(self, listen, port, url_path, cert, key, bootstrap_retries, clean,
webhook_url, allowed_updates):
self.logger.debug('Updater thread started')
self.logger.debug('Updater thread started (webhook)')
use_ssl = cert is not None and key is not None
if not url_path.startswith('/'):
url_path = '/{0}'.format(url_path)
@ -370,39 +395,56 @@ class Updater(object):
def _gen_webhook_url(listen, port, url_path):
return 'https://{listen}:{port}{path}'.format(listen=listen, port=port, path=url_path)
def _bootstrap(self, max_retries, clean, webhook_url, allowed_updates, cert=None):
retries = 0
while 1:
def _bootstrap(self, max_retries, clean, webhook_url, allowed_updates, cert=None,
bootstrap_interval=5):
retries = [0]
try:
if clean:
# Disable webhook for cleaning
self.bot.delete_webhook()
self._clean_updates()
sleep(1)
def bootstrap_del_webhook():
self.bot.delete_webhook()
return False
self.bot.set_webhook(
url=webhook_url, certificate=cert, allowed_updates=allowed_updates)
except (Unauthorized, InvalidToken):
raise
except TelegramError:
msg = 'error in bootstrap phase; try={0} max_retries={1}'.format(retries,
max_retries)
if max_retries < 0 or retries < max_retries:
self.logger.warning(msg)
retries += 1
else:
self.logger.exception(msg)
raise
def bootstrap_clean_updates():
self.logger.debug('Cleaning updates from Telegram server')
updates = self.bot.get_updates()
while updates:
updates = self.bot.get_updates(updates[-1].update_id + 1)
return False
def bootstrap_set_webhook():
self.bot.set_webhook(
url=webhook_url, certificate=cert, allowed_updates=allowed_updates)
return False
def bootstrap_onerr_cb(exc):
if not isinstance(exc, Unauthorized) and (max_retries < 0 or retries[0] < max_retries):
retries[0] += 1
self.logger.warning('Failed bootstrap phase; try=%s max_retries=%s',
retries[0], max_retries)
else:
break
self.logger.error('Failed bootstrap phase after %s retries (%s)', retries[0], exc)
raise exc
# Cleaning pending messages is done by polling for them - so we need to delete webhook if
# one is configured.
# We also take this chance to delete pre-configured webhook if this is a polling Updater.
# NOTE: We don't know ahead if a webhook is configured, so we just delete.
if clean or not webhook_url:
self._network_loop_retry(bootstrap_del_webhook, bootstrap_onerr_cb,
'bootstrap del webhook', bootstrap_interval)
retries[0] = 0
# Clean pending messages, if requested.
if clean:
self._network_loop_retry(bootstrap_clean_updates, bootstrap_onerr_cb,
'bootstrap clean updates', bootstrap_interval)
retries[0] = 0
sleep(1)
def _clean_updates(self):
self.logger.debug('Cleaning updates from Telegram server')
updates = self.bot.get_updates()
while updates:
updates = self.bot.get_updates(updates[-1].update_id + 1)
# Restore/set webhook settings, if needed. Again, we don't know ahead if a webhook is set,
# so we set it anyhow.
if webhook_url:
self._network_loop_retry(bootstrap_set_webhook, bootstrap_onerr_cb,
'bootstrap set webhook', bootstrap_interval)
def stop(self):
"""Stops the polling/webhook thread, the dispatcher and the job queue."""

View file

@ -23,7 +23,7 @@ import sys
from functools import partial
from queue import Queue
from random import randrange
from threading import Thread
from threading import Thread, Event
from time import sleep
try:
@ -38,7 +38,7 @@ import pytest
from future.builtins import bytes
from telegram import TelegramError, Message, User, Chat, Update, Bot
from telegram.error import Unauthorized, InvalidToken
from telegram.error import Unauthorized, InvalidToken, TimedOut, RetryAfter
from telegram.ext import Updater
signalskip = pytest.mark.skipif(sys.platform == 'win32',
@ -58,31 +58,94 @@ class TestUpdater(object):
message_count = 0
received = None
attempts = 0
err_handler_called = Event()
cb_handler_called = Event()
@pytest.fixture(autouse=True)
def reset(self):
self.message_count = 0
self.received = None
self.attempts = 0
self.err_handler_called.clear()
self.cb_handler_called.clear()
def error_handler(self, bot, update, error):
self.received = error.message
self.err_handler_called.set()
def callback(self, bot, update):
self.received = update.message.text
self.cb_handler_called.set()
# TODO: test clean= argument
# TODO: test clean= argument of Updater._bootstrap
def test_error_on_get_updates(self, monkeypatch, updater):
@pytest.mark.parametrize(('error',),
argvalues=[(TelegramError('Test Error 2'),),
(Unauthorized('Test Unauthorized'),)],
ids=('TelegramError', 'Unauthorized'))
def test_get_updates_normal_err(self, monkeypatch, updater, error):
def test(*args, **kwargs):
raise TelegramError('Test Error 2')
raise error
monkeypatch.setattr('telegram.Bot.get_updates', test)
monkeypatch.setattr('telegram.Bot.set_webhook', lambda *args, **kwargs: True)
updater.dispatcher.add_error_handler(self.error_handler)
updater.start_polling(0.01)
sleep(.1)
assert self.received == 'Test Error 2'
# Make sure that the error handler was called
self.err_handler_called.wait()
assert self.received == error.message
# Make sure that Updater polling thread keeps running
self.err_handler_called.clear()
self.err_handler_called.wait()
def test_get_updates_bailout_err(self, monkeypatch, updater, caplog):
error = InvalidToken()
def test(*args, **kwargs):
raise error
with caplog.at_level(logging.DEBUG):
monkeypatch.setattr('telegram.Bot.get_updates', test)
monkeypatch.setattr('telegram.Bot.set_webhook', lambda *args, **kwargs: True)
updater.dispatcher.add_error_handler(self.error_handler)
updater.start_polling(0.01)
assert self.err_handler_called.wait(0.5) is not True
# NOTE: This test might hit a race condition and fail (though the 0.5 seconds delay above
# should work around it).
# NOTE: Checking Updater.running is problematic because it is not set to False when there's
# an unhandled exception.
# TODO: We should have a way to poll Updater status and decide if it's running or not.
assert any('unhandled exception in updater' in rec.getMessage() for rec in
caplog.get_records('call'))
@pytest.mark.parametrize(('error',),
argvalues=[(RetryAfter(0.01),),
(TimedOut(),)],
ids=('RetryAfter', 'TimedOut'))
def test_get_updates_retries(self, monkeypatch, updater, error):
event = Event()
def test(*args, **kwargs):
event.set()
raise error
monkeypatch.setattr('telegram.Bot.get_updates', test)
monkeypatch.setattr('telegram.Bot.set_webhook', lambda *args, **kwargs: True)
updater.dispatcher.add_error_handler(self.error_handler)
updater.start_polling(0.01)
# Make sure that get_updates was called, but not the error handler
event.wait()
assert self.err_handler_called.wait(0.5) is not True
assert self.received != error.message
# Make sure that Updater polling thread keeps running
event.clear()
event.wait()
assert self.err_handler_called.wait(0.5) is not True
def test_webhook(self, monkeypatch, updater):
q = Queue()
@ -145,17 +208,21 @@ class TestUpdater(object):
sleep(.2)
assert q.get(False) == update
def test_bootstrap_retries_success(self, monkeypatch, updater):
@pytest.mark.parametrize(('error',),
argvalues=[(TelegramError(''),)],
ids=('TelegramError',))
def test_bootstrap_retries_success(self, monkeypatch, updater, error):
retries = 2
def attempt(_, *args, **kwargs):
if self.attempts < retries:
self.attempts += 1
raise TelegramError('')
raise error
monkeypatch.setattr('telegram.Bot.set_webhook', attempt)
updater._bootstrap(retries, False, 'path', None)
updater.running = True
updater._bootstrap(retries, False, 'path', None, bootstrap_interval=0)
assert self.attempts == retries
@pytest.mark.parametrize(('error', 'attempts'),
@ -172,8 +239,9 @@ class TestUpdater(object):
monkeypatch.setattr('telegram.Bot.set_webhook', attempt)
updater.running = True
with pytest.raises(type(error)):
updater._bootstrap(retries, False, 'path', None)
updater._bootstrap(retries, False, 'path', None, bootstrap_interval=0)
assert self.attempts == attempts
def test_webhook_invalid_posts(self, updater):