Merge pull request #161 from tsnoam/master

better exception handling
This commit is contained in:
Jannes Höke 2016-02-07 22:34:49 +01:00
commit bec81b5c3b
8 changed files with 175 additions and 105 deletions

View file

@ -1,5 +1,11 @@
.PHONY: clean pep8 lint test install
PYLINT := pylint
NOSETESTS := nosetests
PEP257 := pep257
PEP8 := flake8
PIP := pip
clean:
rm -fr build
rm -fr dist
@ -8,19 +14,19 @@ clean:
find . -name '*~' -exec rm -f {} \;
pep257:
pep257 telegram
$(PEP257) telegram
pep8:
flake8 telegram
$(PEP8) telegram
lint:
pylint -E telegram --disable=no-name-in-module,import-error
$(PYLINT) -E telegram --disable=no-name-in-module,import-error
test:
nosetests
$(NOSETESTS) -v
install:
pip install -r requirements.txt
$(PIP) install -r requirements.txt
help:
@echo "Available targets:"
@ -29,3 +35,10 @@ help:
@echo "- pep8 Check style with flake8"
@echo "- lint Check style with pylint"
@echo "- test Run tests"
@echo
@echo "Available variables:"
@echo "- PYLINT default: $(PYLINT)"
@echo "- NOSETESTS default: $(NOSETESTS)"
@echo "- PEP257 default: $(PEP257)"
@echo "- PEP8 default: $(PEP8)"
@echo "- PIP default: $(PIP)"

View file

@ -25,6 +25,7 @@ import logging
from telegram import (User, Message, Update, UserProfilePhotos, File,
TelegramError, ReplyMarkup, TelegramObject, NullHandler)
from telegram.error import InvalidToken
from telegram.utils import request
H = NullHandler()
@ -751,5 +752,5 @@ class Bot(TelegramObject):
"""a very basic validation on token"""
left, sep, _right = token.partition(':')
if (not sep) or (not left.isdigit()) or (len(left) < 3):
raise TelegramError('Invalid token')
raise InvalidToken()
return token

View file

@ -22,11 +22,12 @@
import logging
from functools import wraps
from inspect import getargspec
from threading import Thread, BoundedSemaphore, Lock
from threading import Thread, BoundedSemaphore, Lock, Event
from re import match
from time import sleep
from telegram import (TelegramError, Update, NullHandler)
from telegram.updatequeue import Empty
H = NullHandler()
logging.getLogger(__name__).addHandler(H)
@ -49,6 +50,9 @@ def run_async(func):
function:
"""
# TODO: handle exception in async threads
# set a threading.Event to notify caller thread
@wraps(func)
def pooled(*pargs, **kwargs):
"""
@ -128,10 +132,10 @@ class Dispatcher:
Args:
bot (telegram.Bot): The bot object that should be passed to the
handlers
update_queue (UpdateQueue): The synchronized queue that will
update_queue (telegram.UpdateQueue): The synchronized queue that will
contain the updates.
"""
def __init__(self, bot, update_queue, workers=4):
def __init__(self, bot, update_queue, workers=4, exception_event=None):
self.bot = bot
self.update_queue = update_queue
self.telegram_message_handlers = []
@ -145,7 +149,8 @@ class Dispatcher:
self.error_handlers = []
self.logger = logging.getLogger(__name__)
self.running = False
self.__lock = Lock()
self.__stop_event = Event()
self.__exception_event = exception_event or Event()
global semaphore
if not semaphore:
@ -153,64 +158,65 @@ class Dispatcher:
else:
self.logger.info("Semaphore already initialized, skipping.")
class _Stop(object):
"""
A class which objects can be passed into the update queue to stop the
thread
"""
pass
def start(self):
"""
Thread target of thread 'dispatcher'. Runs in background and processes
the update queue.
"""
self.__lock.acquire()
if not self.running:
self.running = True
self.__lock.release()
self.logger.info('Dispatcher thread started')
if self.running:
self.logger.warning('already running')
return
while True:
update = None
if self.__exception_event.is_set():
msg = 'reusing dispatcher after exception event is forbidden'
self.logger.error(msg)
raise TelegramError(msg)
try:
# Pop update from update queue.
# Blocks if no updates are available.
update, context = self.update_queue.get(context=True)
self.running = True
self.logger.info('Dispatcher started')
if type(update) is self._Stop:
self.running = False
break
while 1:
try:
# Pop update from update queue.
update, context = self.update_queue.get(True, 1, True)
except Empty:
if self.__stop_event.is_set():
self.logger.info('orderly stopping')
break
elif self.__stop_event.is_set():
self.logger.critical(
'stopping due to exception in another thread')
break
continue
self.processUpdate(update, context)
self.logger.debug('Processed Update: %s with context %s'
% (update, context))
try:
self.processUpdate(update, context)
self.logger.debug('Processed Update: %s with context %s'
% (update, context))
# Dispatch any errors
except TelegramError as te:
self.logger.warn("Error was raised while processing "
"Update.")
self.dispatchError(update, te)
# Dispatch any errors
except TelegramError as te:
self.logger.warn("Error was raised while processing Update.")
self.dispatchError(update, te)
# All other errors should not stop the thread, just print them
except:
self.logger.exception("An uncaught error was raised while "
"processing an update")
else:
self.__lock.release()
# All other errors should not stop the thread, just print them
except:
self.logger.exception("An uncaught error was raised while "
"processing an update")
self.running = False
self.logger.info('Dispatcher thread stopped')
def stop(self):
"""
Stops the thread
"""
with self.__lock:
if self.running:
self.update_queue.put(self._Stop())
while self.running:
sleep(0.1)
if self.running:
self.__stop_event.set()
while self.running:
sleep(0.1)
self.__stop_event.clear()
def processUpdate(self, update, context=None):
"""

View file

@ -59,3 +59,25 @@ class TelegramError(Exception):
def __str__(self):
return '%s' % (self.message)
class Unauthorized(TelegramError):
def __init__(self):
super(Unauthorized, self).__init__('Unauthorized')
class InvalidToken(TelegramError):
def __init__(self):
super(InvalidToken, self).__init__('Invalid token')
class NetworkError(TelegramError):
pass
class TimedOut(NetworkError):
def __init__(self):
super(TimedOut, self).__init__('Timed out')

View file

@ -23,9 +23,10 @@ Queue."""
# Adjust for differences in Python versions
try:
from queue import Queue
# loading Empty here so it can be imported by users of updatequeue
from queue import Queue, Empty # flake8: noqa
except ImportError:
from Queue import Queue
from Queue import Queue, Empty # flake8: noqa
class UpdateQueue(Queue):

View file

@ -24,7 +24,7 @@ Telegram bots intuitive."""
import logging
import os
import ssl
from threading import Thread, Lock
from threading import Thread, Lock, current_thread, Event
from time import sleep
import subprocess
from signal import signal, SIGINT, SIGTERM, SIGABRT
@ -32,11 +32,6 @@ from telegram import (Bot, TelegramError, dispatcher, Dispatcher,
NullHandler, JobQueue, UpdateQueue)
from telegram.utils.webhookhandler import (WebhookServer, WebhookHandler)
try:
from urllib2 import URLError
except ImportError:
from urllib.error import URLError
H = NullHandler()
logging.getLogger(__name__).addHandler(H)
@ -85,8 +80,9 @@ class Updater:
self.bot = Bot(token, base_url)
self.update_queue = UpdateQueue()
self.job_queue = JobQueue(self.bot, job_queue_tick_interval)
self.dispatcher = Dispatcher(self.bot, self.update_queue,
workers=workers)
self.__exception_event = Event()
self.dispatcher = Dispatcher(self.bot, self.update_queue, workers,
self.__exception_event)
self.last_update_id = 0
self.logger = logging.getLogger(__name__)
self.running = False
@ -112,22 +108,30 @@ class Updater:
if not self.running:
self.running = True
# Create Thread objects
dispatcher_thread = Thread(target=self.dispatcher.start,
name="dispatcher")
updater_thread = Thread(target=self._start_polling,
name="updater",
args=(poll_interval,
timeout,
network_delay))
# Start threads
dispatcher_thread.start()
updater_thread.start()
# Create & start threads
self._init_thread(self.dispatcher.start, "dispatcher")
self._init_thread(self._start_polling, "updater",
poll_interval, timeout, network_delay)
# Return the update queue so the main thread can insert updates
return self.update_queue
def _init_thread(self, target, name, *args, **kwargs):
thr = Thread(target=self._thread_wrapper, name=name,
args=(target,) + args, kwargs=kwargs)
thr.start()
def _thread_wrapper(self, target, *args, **kwargs):
thr_name = current_thread()
self.logger.debug('{0} - started'.format(thr_name))
try:
target(*args, **kwargs)
except Exception:
self.__exception_event.set()
self.logger.exception('unhandled exception')
raise
self.logger.debug('{0} - ended'.format(thr_name))
def start_webhook(self,
listen='127.0.0.1',
port=80,
@ -181,7 +185,7 @@ class Updater:
Dispatcher.
"""
current_interval = poll_interval
cur_interval = poll_interval
self.logger.info('Updater thread started')
# Remove webhook
@ -192,36 +196,44 @@ class Updater:
updates = self.bot.getUpdates(self.last_update_id,
timeout=timeout,
network_delay=network_delay)
except TelegramError as te:
self.logger.error(
"Error while getting Updates: {0}".format(te))
# Put the error into the update queue and let the Dispatcher
# broadcast it
self.update_queue.put(te)
cur_interval = self._increase_poll_interval(cur_interval)
else:
if not self.running:
if len(updates) > 0:
self.logger.info('Updates ignored and will be pulled '
'again on restart.')
break
for update in updates:
self.update_queue.put(update)
self.last_update_id = update.update_id + 1
current_interval = poll_interval
if updates:
for update in updates:
self.update_queue.put(update)
self.last_update_id = updates[-1].update_id + 1
sleep(current_interval)
except TelegramError as te:
# Put the error into the update queue and let the Dispatcher
# broadcast it
self.update_queue.put(te)
sleep(current_interval)
cur_interval = poll_interval
except URLError as e:
self.logger.error("Error while getting Updates: %s" % e)
# increase waiting times on subsequent errors up to 30secs
if current_interval == 0:
current_interval = 1
elif current_interval < 30:
current_interval += current_interval / 2
elif current_interval > 30:
current_interval = 30
sleep(cur_interval)
self.logger.info('Updater thread stopped')
@staticmethod
def _increase_poll_interval(current_interval):
# increase waiting times on subsequent errors up to 30secs
if current_interval == 0:
current_interval = 1
elif current_interval < 30:
current_interval += current_interval / 2
elif current_interval > 30:
current_interval = 30
return current_interval
def _start_webhook(self, listen, port, url_path, cert, key):
self.logger.info('Updater thread started')
use_ssl = cert is not None and key is not None
@ -248,6 +260,7 @@ class Updater:
keyfile=key,
server_side=True)
except ssl.SSLError as error:
self.logger.exception('failed to init SSL socket')
raise TelegramError(str(error))
else:
raise TelegramError('SSL Certificate invalid')

View file

@ -33,14 +33,17 @@ except ImportError:
from http.client import HTTPException
try:
# python3
from urllib.request import urlopen, urlretrieve, Request
from urllib.error import HTTPError
from urllib.error import HTTPError, URLError
except ImportError:
# python2
from urllib import urlretrieve
from urllib2 import urlopen, Request
from urllib2 import urlopen, Request, URLError
from urllib2 import HTTPError
from telegram import (InputFile, TelegramError)
from telegram.error import Unauthorized, NetworkError, TimedOut
def _parse(json_data):
@ -72,10 +75,15 @@ def _try_except_req(func):
def decorator(*args, **kwargs):
try:
return func(*args, **kwargs)
except HTTPError as error:
if error.getcode() == 403:
raise TelegramError('Unauthorized')
if error.getcode() == 502:
# `HTTPError` inherits from `URLError` so `HTTPError` handling must
# come first.
errcode = error.getcode()
if errcode in (401, 403):
raise Unauthorized()
if errcode == 502:
raise TelegramError('Bad Gateway')
try:
@ -83,14 +91,20 @@ def _try_except_req(func):
except ValueError:
message = 'Unknown HTTPError {0}'.format(error.getcode())
raise TelegramError(message)
except (SSLError, socket.timeout) as error:
if "operation timed out" in str(error):
raise TelegramError("Timed out")
raise NetworkError('{0} ({1})'.format(message, errcode))
except URLError as error:
raise NetworkError('URLError: {0!r}'.format(error))
except (SSLError, socket.timeout) as error:
err_s = str(error)
if "operation timed out" in err_s:
raise TimedOut()
raise NetworkError(err_s)
raise TelegramError(str(error))
except HTTPException as error:
raise TelegramError('HTTPException: {0!r}'.format(error))
raise NetworkError('HTTPException: {0!r}'.format(error))
return decorator

View file

@ -145,7 +145,7 @@ class BotTest(BaseTest, unittest.TestCase):
def _test_invalid_token(self, token):
print('Testing invalid token: {0}'.format(token))
self.assertRaisesRegexp(telegram.TelegramError, 'Invalid token', telegram.Bot, token)
self.assertRaisesRegexp(telegram.error.InvalidToken, 'Invalid token', telegram.Bot, token)
def testInvalidToken1(self):
self._test_invalid_token('123')
@ -158,7 +158,7 @@ class BotTest(BaseTest, unittest.TestCase):
def testUnauthToken(self):
print('Testing unauthorized token')
with self.assertRaisesRegexp(telegram.TelegramError, 'Unauthorized'):
with self.assertRaisesRegexp(telegram.error.Unauthorized, 'Unauthorized'):
bot = telegram.Bot('1234:abcd1234')
bot.getMe()