mirror of
https://github.com/python-telegram-bot/python-telegram-bot.git
synced 2024-12-29 15:49:02 +01:00
5ab82a9c2b
Co-authored-by: poolitzer <github@poolitzer.eu> Co-authored-by: Hinrich Mahler <22366557+Bibo-Joshi@users.noreply.github.com>
176 lines
5.9 KiB
Python
176 lines
5.9 KiB
Python
#!/usr/bin/env python
|
|
# pylint: disable=unused-argument
|
|
# This program is dedicated to the public domain under the CC0 license.
|
|
|
|
"""
|
|
First, a few callback functions are defined. Then, those functions are passed to
|
|
the Application and registered at their respective places.
|
|
Then, the bot is started and runs until we press Ctrl-C on the command line.
|
|
|
|
Usage:
|
|
Example of a bot-user conversation using ConversationHandler.
|
|
Send /start to initiate the conversation.
|
|
Press Ctrl-C on the command line or send a signal to the process to stop the
|
|
bot.
|
|
"""
|
|
|
|
import logging
|
|
|
|
from telegram import ReplyKeyboardMarkup, ReplyKeyboardRemove, Update
|
|
from telegram.ext import (
|
|
Application,
|
|
CommandHandler,
|
|
ContextTypes,
|
|
ConversationHandler,
|
|
MessageHandler,
|
|
PicklePersistence,
|
|
filters,
|
|
)
|
|
|
|
# Enable logging
|
|
logging.basicConfig(
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
|
|
)
|
|
# set higher logging level for httpx to avoid all GET and POST requests being logged
|
|
logging.getLogger("httpx").setLevel(logging.WARNING)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
CHOOSING, TYPING_REPLY, TYPING_CHOICE = range(3)
|
|
|
|
reply_keyboard = [
|
|
["Age", "Favourite colour"],
|
|
["Number of siblings", "Something else..."],
|
|
["Done"],
|
|
]
|
|
markup = ReplyKeyboardMarkup(reply_keyboard, one_time_keyboard=True)
|
|
|
|
|
|
def facts_to_str(user_data: dict[str, str]) -> str:
|
|
"""Helper function for formatting the gathered user info."""
|
|
facts = [f"{key} - {value}" for key, value in user_data.items()]
|
|
return "\n".join(facts).join(["\n", "\n"])
|
|
|
|
|
|
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
|
"""Start the conversation, display any stored data and ask user for input."""
|
|
reply_text = "Hi! My name is Doctor Botter."
|
|
if context.user_data:
|
|
reply_text += (
|
|
f" You already told me your {', '.join(context.user_data.keys())}. Why don't you "
|
|
"tell me something more about yourself? Or change anything I already know."
|
|
)
|
|
else:
|
|
reply_text += (
|
|
" I will hold a more complex conversation with you. Why don't you tell me "
|
|
"something about yourself?"
|
|
)
|
|
await update.message.reply_text(reply_text, reply_markup=markup)
|
|
|
|
return CHOOSING
|
|
|
|
|
|
async def regular_choice(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
|
"""Ask the user for info about the selected predefined choice."""
|
|
text = update.message.text.lower()
|
|
context.user_data["choice"] = text
|
|
if context.user_data.get(text):
|
|
reply_text = (
|
|
f"Your {text}? I already know the following about that: {context.user_data[text]}"
|
|
)
|
|
else:
|
|
reply_text = f"Your {text}? Yes, I would love to hear about that!"
|
|
await update.message.reply_text(reply_text)
|
|
|
|
return TYPING_REPLY
|
|
|
|
|
|
async def custom_choice(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
|
"""Ask the user for a description of a custom category."""
|
|
await update.message.reply_text(
|
|
'Alright, please send me the category first, for example "Most impressive skill"'
|
|
)
|
|
|
|
return TYPING_CHOICE
|
|
|
|
|
|
async def received_information(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
|
"""Store info provided by user and ask for the next category."""
|
|
text = update.message.text
|
|
category = context.user_data["choice"]
|
|
context.user_data[category] = text.lower()
|
|
del context.user_data["choice"]
|
|
|
|
await update.message.reply_text(
|
|
"Neat! Just so you know, this is what you already told me:"
|
|
f"{facts_to_str(context.user_data)}"
|
|
"You can tell me more, or change your opinion on something.",
|
|
reply_markup=markup,
|
|
)
|
|
|
|
return CHOOSING
|
|
|
|
|
|
async def show_data(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
|
"""Display the gathered info."""
|
|
await update.message.reply_text(
|
|
f"This is what you already told me: {facts_to_str(context.user_data)}"
|
|
)
|
|
|
|
|
|
async def done(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
|
"""Display the gathered info and end the conversation."""
|
|
if "choice" in context.user_data:
|
|
del context.user_data["choice"]
|
|
|
|
await update.message.reply_text(
|
|
f"I learned these facts about you: {facts_to_str(context.user_data)}Until next time!",
|
|
reply_markup=ReplyKeyboardRemove(),
|
|
)
|
|
return ConversationHandler.END
|
|
|
|
|
|
def main() -> None:
|
|
"""Run the bot."""
|
|
# Create the Application and pass it your bot's token.
|
|
persistence = PicklePersistence(filepath="conversationbot")
|
|
application = Application.builder().token("TOKEN").persistence(persistence).build()
|
|
|
|
# Add conversation handler with the states CHOOSING, TYPING_CHOICE and TYPING_REPLY
|
|
conv_handler = ConversationHandler(
|
|
entry_points=[CommandHandler("start", start)],
|
|
states={
|
|
CHOOSING: [
|
|
MessageHandler(
|
|
filters.Regex("^(Age|Favourite colour|Number of siblings)$"), regular_choice
|
|
),
|
|
MessageHandler(filters.Regex("^Something else...$"), custom_choice),
|
|
],
|
|
TYPING_CHOICE: [
|
|
MessageHandler(
|
|
filters.TEXT & ~(filters.COMMAND | filters.Regex("^Done$")), regular_choice
|
|
)
|
|
],
|
|
TYPING_REPLY: [
|
|
MessageHandler(
|
|
filters.TEXT & ~(filters.COMMAND | filters.Regex("^Done$")),
|
|
received_information,
|
|
)
|
|
],
|
|
},
|
|
fallbacks=[MessageHandler(filters.Regex("^Done$"), done)],
|
|
name="my_conversation",
|
|
persistent=True,
|
|
)
|
|
|
|
application.add_handler(conv_handler)
|
|
|
|
show_data_handler = CommandHandler("show_data", show_data)
|
|
application.add_handler(show_data_handler)
|
|
|
|
# Run the bot until the user presses Ctrl-C
|
|
application.run_polling(allowed_updates=Update.ALL_TYPES)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|