ConversationHandler (#331)

* initial commit for conversationhandler and example

* implement simple Promise for run_async/conversationhandler

* refactor Promise._done to done

* add handling for timed out Promises

* correctly handle promises with None results

* fix handling tuple states

* update comments on example

* Added a first test on the ConversationHandler.

* Fixed a small typo.

* Yapf'd.

* add sphinx doc for conversation handler

* fix title for callbackqueryhandler sphinx docs
This commit is contained in:
Jannes Höke 2016-07-15 01:30:54 +02:00 committed by GitHub
parent e3fe1d2632
commit ad3eec2af8
19 changed files with 625 additions and 20 deletions

View file

@ -1,7 +1,7 @@
telegram.ext.handler module
===========================
telegram.ext.callbackqueryhandler module
========================================
.. automodule:: telegram.ext.handler
.. automodule:: telegram.ext.callbackqueryhandler
:members:
:undoc-members:
:show-inheritance:

View file

@ -0,0 +1,7 @@
telegram.ext.conversationhandler module
=======================================
.. automodule:: telegram.ext.conversationhandler
:members:
:undoc-members:
:show-inheritance:

View file

@ -11,6 +11,7 @@ Submodules
telegram.ext.jobqueue
telegram.ext.handler
telegram.ext.choseninlineresulthandler
telegram.ext.conversationhandler
telegram.ext.commandhandler
telegram.ext.inlinequeryhandler
telegram.ext.messagehandler

160
examples/conversationbot.py Normal file
View file

@ -0,0 +1,160 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Simple Bot to reply to Telegram messages
# This program is dedicated to the public domain under the CC0 license.
"""
This Bot uses the Updater class to handle the bot.
First, a few callback functions are defined. Then, those functions are passed to
the Dispatcher and registered at their respective places.
Then, the bot is started and runs until we press Ctrl-C on the command line.
Usage:
Example of a bot-user conversation using ConversationHandler.
Send /start to initiate the conversation.
Press Ctrl-C on the command line or send a signal to the process to stop the
bot.
"""
from telegram import (ReplyKeyboardMarkup)
from telegram.ext import (Updater, CommandHandler, MessageHandler, Filters, RegexHandler,
ConversationHandler)
import logging
# Enable logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger(__name__)
GENDER, PHOTO, LOCATION, BIO = range(4)
def start(bot, update):
reply_keyboard = [['Boy', 'Girl', 'Other']]
bot.sendMessage(update.message.chat_id,
text='Hi! My name is Professor Bot. I will hold a conversation with you. '
'Send /cancel to stop talking to me.\n\n'
'Are you a boy or a girl?',
reply_markup=ReplyKeyboardMarkup(reply_keyboard, one_time_keyboard=True))
return GENDER
def gender(bot, update):
user = update.message.from_user
logger.info("Gender of %s: %s" % (user.first_name, update.message.text))
bot.sendMessage(update.message.chat_id,
text='I see! Please send me a photo of yourself, '
'so I know what you look like, or send /skip if you don\'t want to.')
return PHOTO
def photo(bot, update):
user = update.message.from_user
photo_file = bot.getFile(update.message.photo[-1].file_id)
photo_file.download('user_photo.jpg')
logger.info("Photo of %s: %s" % (user.first_name, 'user_photo.jpg'))
bot.sendMessage(update.message.chat_id, text='Gorgeous! Now, send me your location please, '
'or send /skip if you don\'t want to.')
return LOCATION
def skip_photo(bot, update):
user = update.message.from_user
logger.info("User %s did not send a photo." % user.first_name)
bot.sendMessage(update.message.chat_id, text='I bet you look great! Now, send me your '
'location please, or send /skip.')
return LOCATION
def location(bot, update):
user = update.message.from_user
user_location = update.message.location
logger.info("Location of %s: %f / %f"
% (user.first_name, user_location.latitude, user_location.longitude))
bot.sendMessage(update.message.chat_id, text='Maybe I can visit you sometime! '
'At last, tell me something about yourself.')
return BIO
def skip_location(bot, update):
user = update.message.from_user
logger.info("User %s did not send a location." % user.first_name)
bot.sendMessage(update.message.chat_id, text='You seem a bit paranoid! '
'At last, tell me something about yourself.')
return BIO
def bio(bot, update):
user = update.message.from_user
logger.info("Bio of %s: %s" % (user.first_name, update.message.text))
bot.sendMessage(update.message.chat_id,
text='Thank you! I hope we can talk again some day.')
return ConversationHandler.END
def cancel(bot, update):
user = update.message.from_user
logger.info("User %s canceled the conversation." % user.first_name)
bot.sendMessage(update.message.chat_id,
text='Bye! I hope we can talk again some day.')
return ConversationHandler.END
def error(bot, update, error):
logger.warn('Update "%s" caused error "%s"' % (update, error))
def main():
# Create the EventHandler and pass it your bot's token.
updater = Updater("TOKEN")
# Get the dispatcher to register handlers
dp = updater.dispatcher
# Add conversation handler with the states GENDER, PHOTO, LOCATION and BIO
conv_handler = ConversationHandler(
entry_points=[CommandHandler('start', start)],
states={
GENDER: [RegexHandler('^(Boy|Girl|Other)$', gender)],
PHOTO: [MessageHandler([Filters.photo], photo),
CommandHandler('skip', skip_photo)],
LOCATION: [MessageHandler([Filters.location], location),
CommandHandler('skip', skip_location)],
BIO: [MessageHandler([Filters.text], bio)]
},
fallbacks=[CommandHandler('cancel', cancel)]
)
dp.add_handler(conv_handler)
# log all errors
dp.add_error_handler(error)
# Start the Bot
updater.start_polling()
# Run the bot until the you presses Ctrl-C or the process receives SIGINT,
# SIGTERM or SIGABRT. This should be used most of the time, since
# start_polling() is non-blocking and will stop the bot gracefully.
updater.idle()
if __name__ == '__main__':
main()

View file

@ -31,8 +31,9 @@ from .regexhandler import RegexHandler
from .stringcommandhandler import StringCommandHandler
from .stringregexhandler import StringRegexHandler
from .typehandler import TypeHandler
from .conversationhandler import ConversationHandler
__all__ = ('Dispatcher', 'JobQueue', 'Job', 'Updater', 'CallbackQueryHandler',
'ChosenInlineResultHandler', 'CommandHandler', 'Handler', 'InlineQueryHandler',
'MessageHandler', 'Filters', 'RegexHandler', 'StringCommandHandler',
'StringRegexHandler', 'TypeHandler')
'StringRegexHandler', 'TypeHandler', 'ConversationHandler')

View file

@ -52,7 +52,7 @@ class CallbackQueryHandler(Handler):
def handle_update(self, update, dispatcher):
optional_args = self.collect_optional_args(dispatcher)
self.callback(dispatcher.bot, update, **optional_args)
return self.callback(dispatcher.bot, update, **optional_args)
# old non-PEP8 Handler methods
m = "telegram.CallbackQueryHandler."

View file

@ -53,7 +53,7 @@ class ChosenInlineResultHandler(Handler):
def handle_update(self, update, dispatcher):
optional_args = self.collect_optional_args(dispatcher)
self.callback(dispatcher.bot, update, **optional_args)
return self.callback(dispatcher.bot, update, **optional_args)
# old non-PEP8 Handler methods
m = "telegram.ChosenInlineResultHandler."

View file

@ -83,7 +83,7 @@ class CommandHandler(Handler):
if self.pass_args:
optional_args['args'] = message.text.split(' ')[1:]
self.callback(dispatcher.bot, update, **optional_args)
return self.callback(dispatcher.bot, update, **optional_args)
# old non-PEP8 Handler methods
m = "telegram.CommandHandler."

View file

@ -0,0 +1,222 @@
#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015-2016
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
""" This module contains the ConversationHandler """
import logging
from telegram import Update
from telegram.ext import Handler
from telegram.utils.promise import Promise
class ConversationHandler(Handler):
"""
A handler to hold a conversation with a user by managing four collections of other handlers.
The first collection, a ``list`` named ``entry_points``, is used to initiate the conversation,
for example with a ``CommandHandler`` or ``RegexHandler``.
The second collection, a ``dict`` named ``states``, contains the different conversation steps
and one or more associated handlers that should be used if the user sends a message when the
conversation with them is currently in that state. You will probably use mostly
``MessageHandler`` and ``RegexHandler`` here.
The third collection, a ``list`` named ``fallbacks``, is used if the user is currently in a
conversation but the state has either no associated handler or the handler that is associated
to the state is inappropriate for the update, for example if the update contains a command, but
a regular text message is expected. You could use this for a ``/cancel`` command or to let the
user know their message was not recognized.
The fourth, optional collection of handlers, a ``list`` named ``timed_out_behavior`` is used if
the wait for ``run_async`` takes longer than defined in ``run_async_timeout``. For example,
you can let the user know that they should wait for a bit before they can continue.
To change the state of conversation, the callback function of a handler must return the new
state after responding to the user. If it does not return anything (returning ``None`` by
default), the state will not change. To end the conversation, the callback function must
return ``CallbackHandler.END`` or ``-1``.
Args:
entry_points (list): A list of ``Handler`` objects that can trigger the start of the
conversation. The first handler which ``check_update`` method returns ``True`` will be
used. If all return ``False``, the update is not handled.
states (dict): A ``dict[object: list[Handler]]`` that defines the different states of
conversation a user can be in and one or more associated ``Handler`` objects that
should be used in that state. The first handler which ``check_update`` method returns
``True`` will be used.
fallbacks (list): A list of handlers that might be used if the user is in a conversation,
but every handler for their current state returned ``False`` on ``check_update``.
The first handler which ``check_update`` method returns ``True`` will be used. If all
return ``False``, the update is not handled.
allow_reentry (Optional[bool]): If set to ``True``, a user that is currently in a
conversation can restart the conversation by triggering one of the entry points.
run_async_timeout (Optional[float]): If the previous handler for this user was running
asynchronously using the ``run_async`` decorator, it might not be finished when the
next message arrives. This timeout defines how long the conversation handler should
wait for the next state to be computed. The default is ``None`` which means it will
wait indefinitely.
timed_out_behavior (Optional[list]): A list of handlers that might be used if
the wait for ``run_async`` timed out. The first handler which ``check_update`` method
returns ``True`` will be used. If all return ``False``, the update is not handled.
"""
END = -1
def __init__(self,
entry_points,
states,
fallbacks,
allow_reentry=False,
run_async_timeout=None,
timed_out_behavior=None):
self.entry_points = entry_points
""":type: list[telegram.ext.Handler]"""
self.states = states
""":type: dict[str: telegram.ext.Handler]"""
self.fallbacks = fallbacks
""":type: list[telegram.ext.Handler]"""
self.allow_reentry = allow_reentry
self.run_async_timeout = run_async_timeout
self.timed_out_behavior = timed_out_behavior
""":type: list[telegram.ext.Handler]"""
self.conversations = dict()
""":type: dict[(int, int): str]"""
self.current_conversation = None
self.current_handler = None
self.logger = logging.getLogger(__name__)
def check_update(self, update):
if not isinstance(update, Update):
return False
user = None
chat = None
if update.message:
user = update.message.from_user
chat = update.message.chat
elif update.edited_message:
user = update.edited_message.from_user
chat = update.edited_message.chat
elif update.inline_query:
user = update.inline_query.from_user
elif update.chosen_inline_result:
user = update.chosen_inline_result.from_user
elif update.callback_query:
user = update.callback_query.from_user
chat = update.callback_query.message.chat if update.callback_query.message else None
else:
return False
key = (chat.id, user.id) if chat else (None, user.id)
state = self.conversations.get(key)
# Resolve promises
if isinstance(state, tuple) and len(state) is 2 and isinstance(state[1], Promise):
self.logger.debug('waiting for promise...')
old_state, new_state = state
new_state.result(timeout=self.run_async_timeout)
if new_state.done.is_set():
self.update_state(new_state.result(), key)
state = self.conversations.get(key)
else:
for candidate in (self.timed_out_behavior or []):
if candidate.check_update(update):
# Save the current user and the selected handler for handle_update
self.current_conversation = key
self.current_handler = candidate
return True
else:
return False
self.logger.debug('selecting conversation %s with state %s' % (str(key), str(state)))
handler = None
# Search entry points for a match
if state is None or self.allow_reentry:
for entry_point in self.entry_points:
if entry_point.check_update(update):
handler = entry_point
break
else:
if state is None:
return False
# Get the handler list for current state, if we didn't find one yet and we're still here
if state is not None and not handler:
handlers = self.states.get(state)
for candidate in (handlers or []):
if candidate.check_update(update):
handler = candidate
break
# Find a fallback handler if all other handlers fail
else:
for fallback in self.fallbacks:
if fallback.check_update(update):
handler = fallback
break
else:
return False
# Save the current user and the selected handler for handle_update
self.current_conversation = key
self.current_handler = handler
return True
def handle_update(self, update, dispatcher):
new_state = self.current_handler.handle_update(update, dispatcher)
self.update_state(new_state, self.current_conversation)
def update_state(self, new_state, key):
if new_state == self.END:
del self.conversations[key]
elif isinstance(new_state, Promise):
self.conversations[key] = (self.conversations[key], new_state)
elif new_state is not None:
self.conversations[key] = new_state

View file

@ -30,6 +30,7 @@ from telegram import (TelegramError, NullHandler)
from telegram.utils import request
from telegram.ext.handler import Handler
from telegram.utils.deprecate import deprecate
from telegram.utils.promise import Promise
logging.getLogger(__name__).addHandler(NullHandler())
@ -45,17 +46,16 @@ def _pooled():
A wrapper to run a thread in a thread pool
"""
while 1:
try:
func, args, kwargs = ASYNC_QUEUE.get()
promise = ASYNC_QUEUE.get()
# If unpacking fails, the thread pool is being closed from Updater._join_async_threads
except TypeError:
if not isinstance(promise, Promise):
logging.getLogger(__name__).debug("Closing run_async thread %s/%d" %
(current_thread().getName(), len(ASYNC_THREADS)))
break
try:
func(*args, **kwargs)
promise.run()
except:
logging.getLogger(__name__).exception("run_async function raised exception")
@ -80,7 +80,9 @@ def run_async(func):
"""
A wrapper to run a function in a thread
"""
ASYNC_QUEUE.put((func, args, kwargs))
promise = Promise(func, args, kwargs)
ASYNC_QUEUE.put(promise)
return promise
return async_func

View file

@ -63,11 +63,13 @@ class Handler(object):
"""
This method is called if it was determined that an update should indeed
be handled by this instance. It should also be overridden, but in most
cases call self.callback(dispatcher.bot, update), possibly along with
optional arguments.
cases call ``self.callback(dispatcher.bot, update)``, possibly along with
optional arguments. To work with the ``ConversationHandler``, this method should return the
value returned from ``self.callback``
Args:
update (object): The update to be handled
dispatcher (Dispatcher): The dispatcher to collect optional args
"""
raise NotImplementedError

View file

@ -52,7 +52,7 @@ class InlineQueryHandler(Handler):
def handle_update(self, update, dispatcher):
optional_args = self.collect_optional_args(dispatcher)
self.callback(dispatcher.bot, update, **optional_args)
return self.callback(dispatcher.bot, update, **optional_args)
# old non-PEP8 Handler methods
m = "telegram.InlineQueryHandler."

View file

@ -136,7 +136,7 @@ class MessageHandler(Handler):
def handle_update(self, update, dispatcher):
optional_args = self.collect_optional_args(dispatcher)
self.callback(dispatcher.bot, update, **optional_args)
return self.callback(dispatcher.bot, update, **optional_args)
# old non-PEP8 Handler methods
m = "telegram.MessageHandler."

View file

@ -89,7 +89,7 @@ class RegexHandler(Handler):
if self.pass_groupdict:
optional_args['groupdict'] = match.groupdict()
self.callback(dispatcher.bot, update, **optional_args)
return self.callback(dispatcher.bot, update, **optional_args)
# old non-PEP8 Handler methods
m = "telegram.RegexHandler."

View file

@ -68,7 +68,7 @@ class StringCommandHandler(Handler):
if self.pass_args:
optional_args['args'] = update.split(' ')[1:]
self.callback(dispatcher.bot, update, **optional_args)
return self.callback(dispatcher.bot, update, **optional_args)
# old non-PEP8 Handler methods
m = "telegram.StringCommandHandler."

View file

@ -84,7 +84,7 @@ class StringRegexHandler(Handler):
if self.pass_groupdict:
optional_args['groupdict'] = match.groupdict()
self.callback(dispatcher.bot, update, **optional_args)
return self.callback(dispatcher.bot, update, **optional_args)
# old non-PEP8 Handler methods
m = "telegram.StringRegexHandler."

View file

@ -65,7 +65,7 @@ class TypeHandler(Handler):
def handle_update(self, update, dispatcher):
optional_args = self.collect_optional_args(dispatcher)
self.callback(dispatcher.bot, update, **optional_args)
return self.callback(dispatcher.bot, update, **optional_args)
# old non-PEP8 Handler methods
m = "telegram.TypeHandler."

46
telegram/utils/promise.py Normal file
View file

@ -0,0 +1,46 @@
#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015-2016
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
""" This module contains the Promise class """
from threading import Event
class Promise(object):
"""A simple Promise implementation for the run_async decorator"""
def __init__(self, pooled_function, args, kwargs):
self.pooled_function = pooled_function
self.args = args
self.kwargs = kwargs
self.done = Event()
self._result = None
def run(self):
try:
self._result = self.pooled_function(*self.args, **self.kwargs)
except:
raise
finally:
self.done.set()
def result(self, timeout=None):
self.done.wait(timeout=timeout)
return self._result

View file

@ -0,0 +1,164 @@
#!/usr/bin/env python
# encoding: utf-8
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015-2016
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
"""
This module contains a object that represents Tests for ConversationHandler
"""
import logging
import sys
from time import sleep
if sys.version_info[0:2] == (2, 6):
import unittest2 as unittest
else:
import unittest
try:
# python2
from urllib2 import urlopen, Request, HTTPError
except ImportError:
# python3
from urllib.request import Request, urlopen
from urllib.error import HTTPError
sys.path.append('.')
from telegram import Update, Message, TelegramError, User, Chat, Bot
from telegram.utils.request import stop_con_pool
from telegram.ext import *
from tests.base import BaseTest
from tests.test_updater import MockBot
# Enable logging
root = logging.getLogger()
root.setLevel(logging.DEBUG)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.WARN)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s ' '- %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
class ConversationHandlerTest(BaseTest, unittest.TestCase):
"""
This object represents the tests for the conversation handler.
"""
# State definitions
# At first we're thirsty. Then we brew coffee, we drink it
# and then we can start coding!
END, THIRSTY, BREWING, DRINKING, CODING = range(-1, 4)
# Test related
def setUp(self):
self.updater = None
self.current_state = dict()
self.entry_points = [CommandHandler('start', self.start)]
self.states = {self.THIRSTY: [CommandHandler('brew', self.brew),
CommandHandler('wait', self.start)],
self.BREWING: [CommandHandler('pourCoffee', self.drink)],
self.DRINKING: [CommandHandler('startCoding', self.code),
CommandHandler('drinkMore', self.drink)],
self.CODING: [CommandHandler('keepCoding', self.code),
CommandHandler('gettingThirsty', self.start),
CommandHandler('drinkMore', self.drink)],}
self.fallbacks = [CommandHandler('eat', self.start)]
def _setup_updater(self, *args, **kwargs):
stop_con_pool()
bot = MockBot(*args, **kwargs)
self.updater = Updater(workers=2, bot=bot)
def tearDown(self):
if self.updater is not None:
self.updater.stop()
stop_con_pool()
def reset(self):
self.current_state = dict()
# State handlers
def _set_state(self, update, state):
self.current_state[update.message.from_user.id] = state
return state
def _get_state(self, user_id):
return self.current_state[user_id]
# Actions
def start(self, bot, update):
return self._set_state(update, self.THIRSTY)
def brew(self, bot, update):
return self._set_state(update, self.BREWING)
def drink(self, bot, update):
return self._set_state(update, self.DRINKING)
def code(self, bot, update):
return self._set_state(update, self.CODING)
# Tests
def test_addConversationHandler(self):
self._setup_updater('', messages=0)
d = self.updater.dispatcher
user = User(first_name="Misses Test", id=123)
second_user = User(first_name="Mister Test", id=124)
handler = ConversationHandler(entry_points=self.entry_points,
states=self.states,
fallbacks=self.fallbacks)
d.add_handler(handler)
queue = self.updater.start_polling(0.01)
# User one, starts the state machine.
message = Message(0, user, None, None, text="/start")
queue.put(Update(update_id=0, message=message))
sleep(.1)
self.assertTrue(self.current_state[user.id] == self.THIRSTY)
# The user is thirsty and wants to brew coffee.
message = Message(0, user, None, None, text="/brew")
queue.put(Update(update_id=0, message=message))
sleep(.1)
self.assertTrue(self.current_state[user.id] == self.BREWING)
# Lets see if an invalid command makes sure, no state is changed.
message = Message(0, user, None, None, text="/nothing")
queue.put(Update(update_id=0, message=message))
sleep(.1)
self.assertTrue(self.current_state[user.id] == self.BREWING)
# Lets see if the state machine still works by pouring coffee.
message = Message(0, user, None, None, text="/pourCoffee")
queue.put(Update(update_id=0, message=message))
sleep(.1)
self.assertTrue(self.current_state[user.id] == self.DRINKING)
# Let's now verify that for another user, who did not start yet,
# the state has not been changed.
message = Message(0, second_user, None, None, text="/brew")
queue.put(Update(update_id=0, message=message))
sleep(.1)
self.assertRaises(KeyError, self._get_state, user_id=second_user.id)
if __name__ == '__main__':
unittest.main()