move cleaning updates to bootstrapping phase (#282)

This commit is contained in:
Jannes Höke 2016-05-10 23:58:55 +02:00 committed by Noam Meltzer
parent 252cafb04c
commit 6ec81dd552
2 changed files with 64 additions and 60 deletions

View file

@ -96,6 +96,23 @@ class Updater(object):
self.__threads = []
""":type: list[Thread]"""
def _init_thread(self, target, name, *args, **kwargs):
thr = Thread(target=self._thread_wrapper, name=name,
args=(target,) + args, kwargs=kwargs)
thr.start()
self.__threads.append(thr)
def _thread_wrapper(self, target, *args, **kwargs):
thr_name = current_thread().name
self.logger.debug('{0} - started'.format(thr_name))
try:
target(*args, **kwargs)
except Exception:
self.__exception_event.set()
self.logger.exception('unhandled exception')
raise
self.logger.debug('{0} - ended'.format(thr_name))
def start_polling(self, poll_interval=0.0, timeout=10, network_delay=2,
clean=False, bootstrap_retries=0):
"""
@ -123,35 +140,16 @@ class Updater(object):
with self.__lock:
if not self.running:
self.running = True
if clean:
self._clean_updates()
# Create & start threads
self._init_thread(self.dispatcher.start, "dispatcher")
self._init_thread(self._start_polling, "updater",
poll_interval, timeout, network_delay,
bootstrap_retries)
bootstrap_retries, clean)
# Return the update queue so the main thread can insert updates
return self.update_queue
def _init_thread(self, target, name, *args, **kwargs):
thr = Thread(target=self._thread_wrapper, name=name,
args=(target,) + args, kwargs=kwargs)
thr.start()
self.__threads.append(thr)
def _thread_wrapper(self, target, *args, **kwargs):
thr_name = current_thread().name
self.logger.debug('{0} - started'.format(thr_name))
try:
target(*args, **kwargs)
except Exception:
self.__exception_event.set()
self.logger.exception('unhandled exception')
raise
self.logger.debug('{0} - ended'.format(thr_name))
def start_webhook(self,
listen='127.0.0.1',
port=80,
@ -194,20 +192,18 @@ class Updater(object):
with self.__lock:
if not self.running:
self.running = True
if clean:
self._clean_updates()
# Create & start threads
self._init_thread(self.dispatcher.start, "dispatcher"),
self._init_thread(self._start_webhook, "updater", listen,
port, url_path, cert, key, bootstrap_retries,
webhook_url)
clean, webhook_url)
# Return the update queue so the main thread can insert updates
return self.update_queue
def _start_polling(self, poll_interval, timeout, network_delay,
bootstrap_retries):
bootstrap_retries, clean):
"""
Thread target of thread 'updater'. Runs in background, pulls
updates from Telegram and inserts them in the update queue of the
@ -217,7 +213,7 @@ class Updater(object):
cur_interval = poll_interval
self.logger.debug('Updater thread started')
self._set_webhook(None, bootstrap_retries, None)
self._bootstrap(bootstrap_retries, clean=clean, webhook_url='')
while self.running:
try:
@ -249,28 +245,6 @@ class Updater(object):
sleep(cur_interval)
def _set_webhook(self, webhook_url, max_retries, cert):
retries = 0
while 1:
try:
# Remove webhook
self.bot.setWebhook(webhook_url=webhook_url,
certificate=cert)
except (Unauthorized, InvalidToken):
raise
except TelegramError:
msg = 'failed to set webhook; try={0} max_retries={1}'.format(
retries, max_retries)
if max_retries < 0 or retries < max_retries:
self.logger.info(msg)
retries += 1
else:
self.logger.exception(msg)
raise
else:
break
sleep(1)
@staticmethod
def _increase_poll_interval(current_interval):
# increase waiting times on subsequent errors up to 30secs
@ -283,7 +257,7 @@ class Updater(object):
return current_interval
def _start_webhook(self, listen, port, url_path, cert, key,
bootstrap_retries, webhook_url):
bootstrap_retries, clean, webhook_url):
self.logger.debug('Updater thread started')
use_ssl = cert is not None and key is not None
if not url_path.startswith('/'):
@ -300,8 +274,11 @@ class Updater(object):
if not webhook_url:
webhook_url = self._gen_webhook_url(listen, port, url_path)
self._set_webhook(webhook_url, bootstrap_retries,
open(cert, 'rb'))
self._bootstrap(max_retries=bootstrap_retries, clean=clean,
webhook_url=webhook_url, cert=open(cert, 'rb'))
elif clean:
self.logger.warning("cleaning updates is not supported if "
"SSL-termination happens elsewhere; skipping")
self.httpd.serve_forever(poll_interval=1)
@ -326,12 +303,40 @@ class Updater(object):
else:
raise TelegramError('SSL Certificate invalid')
def _gen_webhook_url(self, listen, port, url_path):
@staticmethod
def _gen_webhook_url(listen, port, url_path):
return 'https://{listen}:{port}{path}'.format(
listen=listen,
port=port,
path=url_path)
def _bootstrap(self, max_retries, clean, webhook_url, cert=None):
retries = 0
while True:
try:
if clean:
# Disable webhook for cleaning
self.bot.setWebhook(webhook_url='')
self._clean_updates()
self.bot.setWebhook(webhook_url=webhook_url,
certificate=cert)
except (Unauthorized, InvalidToken):
raise
except TelegramError:
msg = 'error in bootstrap phase; try={0} max_retries={1}'\
.format(retries, max_retries)
if max_retries < 0 or retries < max_retries:
self.logger.warning(msg)
retries += 1
else:
self.logger.exception(msg)
raise
else:
break
sleep(1)
def _clean_updates(self):
self.logger.debug('Cleaning updates from Telegram server')
updates = self.bot.getUpdates()

View file

@ -489,7 +489,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
retries = 3
self._setup_updater('', messages=0, bootstrap_retries=retries)
self.updater._set_webhook('path', retries, None)
self.updater._bootstrap(retries, False, 'path', None)
self.assertEqual(self.updater.bot.bootstrap_attempts, retries)
def test_bootstrap_retries_unauth(self):
@ -499,8 +499,8 @@ class UpdaterTest(BaseTest, unittest.TestCase):
bootstrap_retries=retries,
bootstrap_err=Unauthorized())
self.assertRaises(Unauthorized, self.updater._set_webhook, 'path',
retries, None)
self.assertRaises(Unauthorized, self.updater._bootstrap,
retries, False, 'path', None)
self.assertEqual(self.updater.bot.bootstrap_attempts, 1)
def test_bootstrap_retries_invalid_token(self):
@ -510,17 +510,16 @@ class UpdaterTest(BaseTest, unittest.TestCase):
bootstrap_retries=retries,
bootstrap_err=InvalidToken())
self.assertRaises(InvalidToken, self.updater._set_webhook, 'path',
retries, None)
self.assertRaises(InvalidToken, self.updater._bootstrap,
retries, False, 'path', None)
self.assertEqual(self.updater.bot.bootstrap_attempts, 1)
def test_bootstrap_retries_fail(self):
retries = 1
self._setup_updater('', messages=0, bootstrap_retries=retries)
self.assertRaisesRegexp(TelegramError, 'test',
self.updater._set_webhook, 'path', retries - 1,
None)
self.assertRaisesRegexp(TelegramError, 'test', self.updater._bootstrap,
retries - 1, False, 'path', None)
self.assertEqual(self.updater.bot.bootstrap_attempts, 1)
def test_webhook_invalid_posts(self):
@ -529,7 +528,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
ip = '127.0.0.1'
port = randrange(1024, 49152) # select random port for travis
thr = Thread(target=self.updater._start_webhook,
args=(ip, port, '', None, None, 0, None))
args=(ip, port, '', None, None, 0, False, None))
thr.start()
sleep(0.5)