Flow control ability in Dispatcher (#738)

fixes #666
This commit is contained in:
Ihor Polyakov 2017-07-30 00:15:43 +07:00 committed by Noam Meltzer
parent 5d3f5575ab
commit 6aacde189e
4 changed files with 132 additions and 14 deletions

View file

@ -18,7 +18,7 @@
# along with this program. If not, see [http://www.gnu.org/licenses/]. # along with this program. If not, see [http://www.gnu.org/licenses/].
"""Extensions over the Telegram Bot API to facilitate bot making""" """Extensions over the Telegram Bot API to facilitate bot making"""
from .dispatcher import Dispatcher from .dispatcher import Dispatcher, DispatcherHandlerContinue, DispatcherHandlerStop, run_async
from .jobqueue import JobQueue, Job from .jobqueue import JobQueue, Job
from .updater import Updater from .updater import Updater
from .callbackqueryhandler import CallbackQueryHandler from .callbackqueryhandler import CallbackQueryHandler
@ -42,4 +42,5 @@ __all__ = ('Dispatcher', 'JobQueue', 'Job', 'Updater', 'CallbackQueryHandler',
'ChosenInlineResultHandler', 'CommandHandler', 'Handler', 'InlineQueryHandler', 'ChosenInlineResultHandler', 'CommandHandler', 'Handler', 'InlineQueryHandler',
'MessageHandler', 'BaseFilter', 'Filters', 'RegexHandler', 'StringCommandHandler', 'MessageHandler', 'BaseFilter', 'Filters', 'RegexHandler', 'StringCommandHandler',
'StringRegexHandler', 'TypeHandler', 'ConversationHandler', 'StringRegexHandler', 'TypeHandler', 'ConversationHandler',
'PreCheckoutQueryHandler', 'ShippingQueryHandler', 'MessageQueue', 'DelayQueue') 'PreCheckoutQueryHandler', 'ShippingQueryHandler', 'MessageQueue', 'DelayQueue',
'DispatcherHandlerContinue', 'DispatcherHandlerStop', 'run_async')

View file

@ -55,6 +55,28 @@ def run_async(func):
return async_func return async_func
class DispatcherHandlerFlow(Exception):
"""
Dispatcher update processing manipulation exceptions are base on this class.
"""
pass
class DispatcherHandlerContinue(DispatcherHandlerFlow):
"""
If check Handler's check_updated returned true, but execution of handler raised this,
then handlers checking will continue.
"""
pass
class DispatcherHandlerStop(DispatcherHandlerFlow):
"""
Raise this in handler to prevent execution any other handlers (even in different group).
"""
pass
class Dispatcher(object): class Dispatcher(object):
""" """
This class dispatches all kinds of updates to its registered handlers. This class dispatches all kinds of updates to its registered handlers.
@ -162,6 +184,9 @@ class Dispatcher(object):
break break
promise.run() promise.run()
if isinstance(promise.exception, DispatcherHandlerFlow):
self.logger.warning('DispatcherHandlerFlow is not supported with async '
'functions; func: %s', promise.pooled_function.__name__)
def run_async(self, func, *args, **kwargs): def run_async(self, func, *args, **kwargs):
""" """
@ -255,24 +280,29 @@ class Dispatcher(object):
Processes a single update. Processes a single update.
Args: Args:
update (:obj:`str` | :class:`telegram.Update`): The update to process. update (:obj:`str` | :class:`telegram.Update` | :class:`telegram.TelegramError`):
The update to process.
""" """
# An error happened while polling # An error happened while polling
if isinstance(update, TelegramError): if isinstance(update, TelegramError):
self.dispatch_error(None, update) self.dispatch_error(None, update)
return
else: for group in self.groups:
for group in self.groups: try:
for handler in self.handlers[group]: for handler in self.handlers[group]:
try: try:
if handler.check_update(update): if handler.check_update(update):
handler.handle_update(update, self) try:
handler.handle_update(update, self)
except DispatcherHandlerContinue:
continue
break break
# Dispatch any errors except DispatcherHandlerFlow:
raise
except TelegramError as te: except TelegramError as te:
self.logger.warn('A TelegramError was raised while processing the ' self.logger.warning('A TelegramError was raised while processing the '
'Update.') 'Update.')
try: try:
self.dispatch_error(update, te) self.dispatch_error(update, te)
@ -287,6 +317,8 @@ class Dispatcher(object):
self.logger.exception('An uncaught error was raised while ' self.logger.exception('An uncaught error was raised while '
'processing the update') 'processing the update')
break break
except DispatcherHandlerStop:
break
def add_handler(self, handler, group=DEFAULT_GROUP): def add_handler(self, handler, group=DEFAULT_GROUP):
""" """
@ -297,7 +329,9 @@ class Dispatcher(object):
A handler must be an instance of a subclass of :class:`telegram.ext.Handler`. All handlers A handler must be an instance of a subclass of :class:`telegram.ext.Handler`. All handlers
are organized in groups with a numeric value. The default group is 0. All groups will be are organized in groups with a numeric value. The default group is 0. All groups will be
evaluated for handling an update, but only 0 or 1 handler per group will be used. evaluated for handling an update, but only 0 or 1 handler per group will be used,
except situations when :class:`telegram.DispatcherHandlerContinue` or
:class:`telegram.DispatcherHandlerStop` were raised.
The priority/order of handlers is determined as follows: The priority/order of handlers is determined as follows:
@ -305,6 +339,10 @@ class Dispatcher(object):
* The first handler in a group which should handle an update will be * The first handler in a group which should handle an update will be
used. Other handlers from the group will not be used. The order in used. Other handlers from the group will not be used. The order in
which handlers were added to the group defines the priority. which handlers were added to the group defines the priority.
* If :class:`telegram.DispatcherHandlerContinue` was raised, then next handler in the
same group will be called.
* If :class:`telegram.DispatcherHandlerStop` was raised, then zero handlers (even
from other groups) will called.
Args: Args:
handler (:class:`telegram.ext.Handler`): A Handler instance. handler (:class:`telegram.ext.Handler`): A Handler instance.
@ -343,7 +381,7 @@ class Dispatcher(object):
Registers an error handler in the Dispatcher. Registers an error handler in the Dispatcher.
Args: Args:
handler (:obj:`callable`): A function that takes ``Bot, Update, TelegramError`` as callback (:obj:`callable`): A function that takes ``Bot, Update, TelegramError`` as
arguments. arguments.
""" """
@ -354,7 +392,7 @@ class Dispatcher(object):
Removes an error handler. Removes an error handler.
Args: Args:
handler (:obj:`callable`): The error handler to remove. callback (:obj:`callable`): The error handler to remove.
""" """
if callback in self.error_handlers: if callback in self.error_handlers:
@ -365,7 +403,7 @@ class Dispatcher(object):
Dispatches an error. Dispatches an error.
Args: Args:
update (:obj:`str` | :class:`telegram.Update`): The update that caused the error update (:obj:`str` | :class:`telegram.Update` | None): The update that caused the error
error (:class:`telegram.TelegramError`): The Telegram error that was raised. error (:class:`telegram.TelegramError`): The Telegram error that was raised.
""" """

View file

@ -56,3 +56,7 @@ class Promise(object):
if self._exception is not None: if self._exception is not None:
raise self._exception # pylint: disable=raising-bad-type raise self._exception # pylint: disable=raising-bad-type
return self._result return self._result
@property
def exception(self):
return self._exception

View file

@ -939,6 +939,81 @@ class UpdaterTest(BaseTest, unittest.TestCase):
def test_noTokenOrBot(self): def test_noTokenOrBot(self):
self.assertRaises(ValueError, Updater) self.assertRaises(ValueError, Updater)
def test_dispatcher_handler_flow_continue(self):
passed = []
def start1(b, u):
passed.append('start1')
raise DispatcherHandlerContinue
def start2(b, u):
passed.append('start2')
def start3(b, u):
passed.append('start3')
def error(b, u, e):
passed.append('error')
passed.append(e)
# noinspection PyTypeChecker
update = Update(1, message=Message(1, None, None, None, text='/start', bot=self._bot))
# Without raising Continue everything should work as before
passed = []
dp = Dispatcher(self._bot, Queue())
dp.add_handler(CommandHandler('start', start3))
dp.add_handler(CommandHandler('start', start2))
dp.add_error_handler(error)
dp.process_update(update)
self.assertEqual(passed, ['start3'])
# If Continue raised next handler should be proceed.
passed = []
dp = Dispatcher(self._bot, Queue())
dp.add_handler(CommandHandler('start', start1))
dp.add_handler(CommandHandler('start', start2))
dp.process_update(update)
self.assertEqual(passed, ['start1', 'start2'])
def test_dispatcher_handler_flow_stop(self):
passed = []
def start1(b, u):
passed.append('start1')
raise DispatcherHandlerStop
def start2(b, u):
passed.append('start2')
def start3(b, u):
passed.append('start3')
def error(b, u, e):
passed.append('error')
passed.append(e)
# noinspection PyTypeChecker
update = Update(1, message=Message(1, None, None, None, text='/start', bot=self._bot))
# Without raising Stop everything should work as before
passed = []
dp = Dispatcher(self._bot, Queue())
dp.add_handler(CommandHandler('start', start3), 1)
dp.add_handler(CommandHandler('start', start2), 2)
dp.add_error_handler(error)
dp.process_update(update)
self.assertEqual(passed, ['start3', 'start2'])
# If Stop raised handlers in other groups should not be called.
passed = []
dp = Dispatcher(self._bot, Queue())
dp.add_handler(CommandHandler('start', start1), 1)
dp.add_handler(CommandHandler('start', start3), 1)
dp.add_handler(CommandHandler('start', start2), 2)
dp.process_update(update)
self.assertEqual(passed, ['start1'])
class MockBot(object): class MockBot(object):
def __init__(self, def __init__(self,