python-telegram-bot/tests/ext/test_conversationhandler.py
2025-01-01 14:51:12 +01:00

2321 lines
89 KiB
Python

#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015-2025
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
"""Persistence of conversations is tested in test_basepersistence.py"""
import asyncio
import functools
import logging
from pathlib import Path
from warnings import filterwarnings
import pytest
from telegram import (
CallbackQuery,
Chat,
ChosenInlineResult,
InlineQuery,
Message,
MessageEntity,
PreCheckoutQuery,
ShippingQuery,
Update,
User,
)
from telegram.ext import (
ApplicationBuilder,
ApplicationHandlerStop,
CallbackContext,
CallbackQueryHandler,
ChosenInlineResultHandler,
CommandHandler,
ConversationHandler,
Defaults,
InlineQueryHandler,
JobQueue,
MessageHandler,
PollAnswerHandler,
PollHandler,
PreCheckoutQueryHandler,
ShippingQueryHandler,
StringCommandHandler,
StringRegexHandler,
TypeHandler,
filters,
)
from telegram.warnings import PTBUserWarning
from tests.auxil.build_messages import make_command_message
from tests.auxil.files import PROJECT_ROOT_PATH
from tests.auxil.pytest_classes import PytestBot, make_bot
from tests.auxil.slots import mro_slots
@pytest.fixture(scope="class")
def user1():
return User(first_name="Misses Test", id=123, is_bot=False)
@pytest.fixture(scope="class")
def user2():
return User(first_name="Mister Test", id=124, is_bot=False)
def raise_ahs(func):
@functools.wraps(func) # for checking __repr__
async def decorator(self, *args, **kwargs):
result = await func(self, *args, **kwargs)
if self.raise_app_handler_stop:
raise ApplicationHandlerStop(result)
return result
return decorator
class TestConversationHandler:
"""Persistence of conversations is tested in test_basepersistence.py"""
# State definitions
# At first we're thirsty. Then we brew coffee, we drink it
# and then we can start coding!
END, THIRSTY, BREWING, DRINKING, CODING = range(-1, 4)
# Drinking state definitions (nested)
# At first we're holding the cup. Then we sip coffee, and last we swallow it
HOLDING, SIPPING, SWALLOWING, REPLENISHING, STOPPING = map(chr, range(ord("a"), ord("f")))
current_state, entry_points, states, fallbacks = None, None, None, None
group = Chat(0, Chat.GROUP)
second_group = Chat(1, Chat.GROUP)
raise_app_handler_stop = False
test_flag = False
# Test related
@pytest.fixture(autouse=True)
def _reset(self):
self.raise_app_handler_stop = False
self.test_flag = False
self.current_state = {}
self.entry_points = [CommandHandler("start", self.start)]
self.states = {
self.THIRSTY: [CommandHandler("brew", self.brew), CommandHandler("wait", self.start)],
self.BREWING: [CommandHandler("pourCoffee", self.drink)],
self.DRINKING: [
CommandHandler("startCoding", self.code),
CommandHandler("drinkMore", self.drink),
CommandHandler("end", self.end),
],
self.CODING: [
CommandHandler("keepCoding", self.code),
CommandHandler("gettingThirsty", self.start),
CommandHandler("drinkMore", self.drink),
],
}
self.fallbacks = [CommandHandler("eat", self.start)]
self.is_timeout = False
# for nesting tests
self.nested_states = {
self.THIRSTY: [CommandHandler("brew", self.brew), CommandHandler("wait", self.start)],
self.BREWING: [CommandHandler("pourCoffee", self.drink)],
self.CODING: [
CommandHandler("keepCoding", self.code),
CommandHandler("gettingThirsty", self.start),
CommandHandler("drinkMore", self.drink),
],
}
self.drinking_entry_points = [CommandHandler("hold", self.hold)]
self.drinking_states = {
self.HOLDING: [CommandHandler("sip", self.sip)],
self.SIPPING: [CommandHandler("swallow", self.swallow)],
self.SWALLOWING: [CommandHandler("hold", self.hold)],
}
self.drinking_fallbacks = [
CommandHandler("replenish", self.replenish),
CommandHandler("stop", self.stop),
CommandHandler("end", self.end),
CommandHandler("startCoding", self.code),
CommandHandler("drinkMore", self.drink),
]
self.drinking_entry_points.extend(self.drinking_fallbacks)
# Map nested states to parent states:
self.drinking_map_to_parent = {
# Option 1 - Map a fictional internal state to an external parent state
self.REPLENISHING: self.BREWING,
# Option 2 - Map a fictional internal state to the END state on the parent
self.STOPPING: self.END,
# Option 3 - Map the internal END state to an external parent state
self.END: self.CODING,
# Option 4 - Map an external state to the same external parent state
self.CODING: self.CODING,
# Option 5 - Map an external state to the internal entry point
self.DRINKING: self.DRINKING,
}
# State handlers
def _set_state(self, update, state):
self.current_state[update.message.from_user.id] = state
return state
# Actions
@raise_ahs
async def start(self, update, context):
if isinstance(update, Update):
return self._set_state(update, self.THIRSTY)
return self._set_state(context.bot, self.THIRSTY)
@raise_ahs
async def end(self, update, context):
return self._set_state(update, self.END)
@raise_ahs
async def start_end(self, update, context):
return self._set_state(update, self.END)
@raise_ahs
async def start_none(self, update, context):
return self._set_state(update, None)
@raise_ahs
async def brew(self, update, context):
if isinstance(update, Update):
return self._set_state(update, self.BREWING)
return self._set_state(context.bot, self.BREWING)
@raise_ahs
async def drink(self, update, context):
return self._set_state(update, self.DRINKING)
@raise_ahs
async def code(self, update, context):
return self._set_state(update, self.CODING)
@raise_ahs
async def passout(self, update, context):
assert update.message.text == "/brew"
assert isinstance(update, Update)
self.is_timeout = True
@raise_ahs
async def passout2(self, update, context):
assert isinstance(update, Update)
self.is_timeout = True
@raise_ahs
async def passout_context(self, update, context):
assert update.message.text == "/brew"
assert isinstance(context, CallbackContext)
self.is_timeout = True
@raise_ahs
async def passout2_context(self, update, context):
assert isinstance(context, CallbackContext)
self.is_timeout = True
# Drinking actions (nested)
@raise_ahs
async def hold(self, update, context):
return self._set_state(update, self.HOLDING)
@raise_ahs
async def sip(self, update, context):
return self._set_state(update, self.SIPPING)
@raise_ahs
async def swallow(self, update, context):
return self._set_state(update, self.SWALLOWING)
@raise_ahs
async def replenish(self, update, context):
return self._set_state(update, self.REPLENISHING)
@raise_ahs
async def stop(self, update, context):
return self._set_state(update, self.STOPPING)
def test_slot_behaviour(self):
handler = ConversationHandler(entry_points=[], states={}, fallbacks=[])
for attr in handler.__slots__:
assert getattr(handler, attr, "err") != "err", f"got extra slot '{attr}'"
assert len(mro_slots(handler)) == len(set(mro_slots(handler))), "duplicate slot"
def test_init(self):
entry_points = []
states = {}
fallbacks = []
map_to_parent = {}
ch = ConversationHandler(
entry_points=entry_points,
states=states,
fallbacks=fallbacks,
per_chat="per_chat",
per_user="per_user",
per_message="per_message",
persistent="persistent",
name="name",
allow_reentry="allow_reentry",
conversation_timeout=42,
map_to_parent=map_to_parent,
)
assert ch.entry_points is entry_points
assert ch.states is states
assert ch.fallbacks is fallbacks
assert ch.map_to_parent is map_to_parent
assert ch.per_chat == "per_chat"
assert ch.per_user == "per_user"
assert ch.per_message == "per_message"
assert ch.persistent == "persistent"
assert ch.name == "name"
assert ch.allow_reentry == "allow_reentry"
def test_init_persistent_no_name(self):
with pytest.raises(ValueError, match="can't be persistent when handler is unnamed"):
ConversationHandler(
self.entry_points, states=self.states, fallbacks=[], persistent=True
)
def test_repr_no_truncation(self):
# ConversationHandler's __repr__ is not inherited from BaseHandler.
ch = ConversationHandler(
name="test_handler",
entry_points=[],
states=self.drinking_states,
fallbacks=[],
)
assert repr(ch) == (
"ConversationHandler[name=test_handler, "
"states={'a': [CommandHandler[callback=TestConversationHandler.sip]], "
"'b': [CommandHandler[callback=TestConversationHandler.swallow]], "
"'c': [CommandHandler[callback=TestConversationHandler.hold]]}]"
)
def test_repr_with_truncation(self):
from copy import copy
states = copy(self.drinking_states)
# there are exactly 3 drinking states. adding one more to make sure it's truncated
states["extra_to_be_truncated"] = [CommandHandler("foo", self.start)]
ch = ConversationHandler(
name="test_handler",
entry_points=[],
states=states,
fallbacks=[],
)
assert repr(ch) == (
"ConversationHandler[name=test_handler, "
"states={'a': [CommandHandler[callback=TestConversationHandler.sip]], "
"'b': [CommandHandler[callback=TestConversationHandler.swallow]], "
"'c': [CommandHandler[callback=TestConversationHandler.hold]], ...}]"
)
async def test_check_update_returns_non(self, app, user1):
"""checks some cases where updates should not be handled"""
conv_handler = ConversationHandler([], {}, [], per_message=True, per_chat=True)
assert not conv_handler.check_update("not an Update")
assert not conv_handler.check_update(Update(0))
assert not conv_handler.check_update(
Update(0, callback_query=CallbackQuery("1", from_user=user1, chat_instance="1"))
)
async def test_handlers_generate_warning(self, recwarn):
"""this function tests all handler + per_* setting combinations."""
# the warning message action needs to be set to always,
# otherwise only the first occurrence will be issued
filterwarnings(action="always", category=PTBUserWarning)
# this class doesn't do anything, its just not the Update class
class NotUpdate:
pass
recwarn.clear()
# this conversation handler has the string, string_regex, Pollhandler and TypeHandler
# which should all generate a warning no matter the per_* setting. TypeHandler should
# not when the class is Update
ConversationHandler(
entry_points=[StringCommandHandler("code", self.code)],
states={
self.BREWING: [
StringRegexHandler("code", self.code),
PollHandler(self.code),
TypeHandler(NotUpdate, self.code),
],
},
fallbacks=[TypeHandler(Update, self.code)],
)
# these handlers should all raise a warning when per_chat is True
ConversationHandler(
entry_points=[ShippingQueryHandler(self.code)],
states={
self.BREWING: [
InlineQueryHandler(self.code),
PreCheckoutQueryHandler(self.code),
PollAnswerHandler(self.code),
],
},
fallbacks=[ChosenInlineResultHandler(self.code)],
per_chat=True,
)
# the CallbackQueryHandler should *not* raise when per_message is True,
# but any other one should
ConversationHandler(
entry_points=[CallbackQueryHandler(self.code)],
states={
self.BREWING: [CommandHandler("code", self.code)],
},
fallbacks=[CallbackQueryHandler(self.code)],
per_message=True,
)
# the CallbackQueryHandler should raise when per_message is False
ConversationHandler(
entry_points=[CommandHandler("code", self.code)],
states={
self.BREWING: [CommandHandler("code", self.code)],
},
fallbacks=[CallbackQueryHandler(self.code)],
per_message=False,
)
# adding a nested conv to a conversation with timeout should warn
child = ConversationHandler(
entry_points=[CommandHandler("code", self.code)],
states={
self.BREWING: [CommandHandler("code", self.code)],
},
fallbacks=[CommandHandler("code", self.code)],
)
ConversationHandler(
entry_points=[CommandHandler("code", self.code)],
states={
self.BREWING: [child],
},
fallbacks=[CommandHandler("code", self.code)],
conversation_timeout=42,
)
# If per_message is True, per_chat should also be True, since msg ids are not unique
ConversationHandler(
entry_points=[CallbackQueryHandler(self.code, "code")],
states={
self.BREWING: [CallbackQueryHandler(self.code, "code")],
},
fallbacks=[CallbackQueryHandler(self.code, "code")],
per_message=True,
per_chat=False,
)
# the overall number of handlers throwing a warning is 13
assert len(recwarn) == 13
# now we test the messages, they are raised in the order they are inserted
# into the conversation handler
assert (
str(recwarn[0].message)
== "The `ConversationHandler` only handles updates of type `telegram.Update`. "
"StringCommandHandler handles updates of type `str`."
)
assert (
str(recwarn[1].message)
== "The `ConversationHandler` only handles updates of type `telegram.Update`. "
"StringRegexHandler handles updates of type `str`."
)
assert (
str(recwarn[2].message)
== "PollHandler will never trigger in a conversation since it has no information "
"about the chat or the user who voted in it. Do you mean the "
"`PollAnswerHandler`?"
)
assert (
str(recwarn[3].message)
== "The `ConversationHandler` only handles updates of type `telegram.Update`. "
"The TypeHandler is set to handle NotUpdate."
)
per_faq_link = (
" Read this FAQ entry to learn more about the per_* settings: "
"https://github.com/python-telegram-bot/python-telegram-bot/wiki"
"/Frequently-Asked-Questions#what-do-the-per_-settings-in-conversationhandler-do."
)
assert str(recwarn[4].message) == (
"Updates handled by ShippingQueryHandler only have information about the user,"
" so this handler won't ever be triggered if `per_chat=True`." + per_faq_link
)
assert str(recwarn[5].message) == (
"Updates handled by ChosenInlineResultHandler only have information about the user,"
" so this handler won't ever be triggered if `per_chat=True`." + per_faq_link
)
assert str(recwarn[6].message) == (
"Updates handled by InlineQueryHandler only have information about the user,"
" so this handler won't ever be triggered if `per_chat=True`." + per_faq_link
)
assert str(recwarn[7].message) == (
"Updates handled by PreCheckoutQueryHandler only have information about the user,"
" so this handler won't ever be triggered if `per_chat=True`." + per_faq_link
)
assert str(recwarn[8].message) == (
"Updates handled by PollAnswerHandler only have information about the user,"
" so this handler won't ever be triggered if `per_chat=True`." + per_faq_link
)
assert str(recwarn[9].message) == (
"If 'per_message=True', all entry points, state handlers, and fallbacks must be "
"'CallbackQueryHandler', since no other handlers have a message context."
+ per_faq_link
)
assert str(recwarn[10].message) == (
"If 'per_message=False', 'CallbackQueryHandler' will not be tracked for every message."
+ per_faq_link
)
assert (
str(recwarn[11].message)
== "Using `conversation_timeout` with nested conversations is currently not "
"supported. You can still try to use it, but it will likely behave differently"
" from what you expect."
)
assert (
str(recwarn[12].message)
== "If 'per_message=True' is used, 'per_chat=True' should also be used, "
"since message IDs are not globally unique."
)
# this for loop checks if the correct stacklevel is used when generating the warning
for warning in recwarn:
assert warning.category is PTBUserWarning
assert warning.filename == __file__, "incorrect stacklevel!"
@pytest.mark.parametrize(
"attr",
[
"entry_points",
"states",
"fallbacks",
"per_chat",
"per_user",
"per_message",
"name",
"persistent",
"allow_reentry",
"conversation_timeout",
"map_to_parent",
],
indirect=False,
)
def test_immutable(self, attr):
ch = ConversationHandler(entry_points=[], states={}, fallbacks=[])
with pytest.raises(AttributeError, match=f"You can not assign a new value to {attr}"):
setattr(ch, attr, True)
def test_per_all_false(self):
with pytest.raises(ValueError, match="can't all be 'False'"):
ConversationHandler(
entry_points=[],
states={},
fallbacks=[],
per_chat=False,
per_user=False,
per_message=False,
)
@pytest.mark.parametrize("raise_ahs", [True, False])
async def test_basic_and_app_handler_stop(self, app, bot, user1, user2, raise_ahs):
handler = ConversationHandler(
entry_points=self.entry_points, states=self.states, fallbacks=self.fallbacks
)
app.add_handler(handler)
async def callback(_, __):
self.test_flag = True
app.add_handler(TypeHandler(object, callback), group=100)
self.raise_app_handler_stop = raise_ahs
# User one, starts the state machine.
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
async with app:
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.THIRSTY
assert self.test_flag == (not raise_ahs)
# The user is thirsty and wants to brew coffee.
message.text = "/brew"
message.entities[0].length = len("/brew")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.BREWING
assert self.test_flag == (not raise_ahs)
# Lets see if an invalid command makes sure, no state is changed.
message.text = "/nothing"
message.entities[0].length = len("/nothing")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.BREWING
assert self.test_flag is True
self.test_flag = False
# Lets see if the state machine still works by pouring coffee.
message.text = "/pourCoffee"
message.entities[0].length = len("/pourCoffee")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.DRINKING
assert self.test_flag == (not raise_ahs)
# Let's now verify that for another user, who did not start yet,
# the state has not been changed.
message.from_user = user2
await app.process_update(Update(update_id=0, message=message))
with pytest.raises(KeyError):
self.current_state[user2.id]
async def test_conversation_handler_end(self, caplog, app, bot, user1):
handler = ConversationHandler(
entry_points=self.entry_points, states=self.states, fallbacks=self.fallbacks
)
app.add_handler(handler)
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
async with app:
await app.process_update(Update(update_id=0, message=message))
message.text = "/brew"
message.entities[0].length = len("/brew")
await app.process_update(Update(update_id=0, message=message))
message.text = "/pourCoffee"
message.entities[0].length = len("/pourCoffee")
await app.process_update(Update(update_id=0, message=message))
message.text = "/end"
message.entities[0].length = len("/end")
caplog.clear()
with caplog.at_level(logging.ERROR):
await app.process_update(Update(update_id=0, message=message))
assert len(caplog.records) == 0
assert self.current_state[user1.id] == self.END
# make sure that the conversation has ended by checking that the start command is
# accepted again
message.text = "/start"
message.entities[0].length = len("/start")
assert handler.check_update(Update(update_id=0, message=message))
async def test_conversation_handler_fallback(self, app, bot, user1, user2):
handler = ConversationHandler(
entry_points=self.entry_points, states=self.states, fallbacks=self.fallbacks
)
app.add_handler(handler)
# first check if fallback will not trigger start when not started
message = Message(
0,
None,
self.group,
from_user=user1,
text="/eat",
entities=[MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/eat"))],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
async with app:
await app.process_update(Update(update_id=0, message=message))
with pytest.raises(KeyError):
self.current_state[user1.id]
# User starts the state machine.
message.text = "/start"
message.entities[0].length = len("/start")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.THIRSTY
# The user is thirsty and wants to brew coffee.
message.text = "/brew"
message.entities[0].length = len("/brew")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.BREWING
# Now a fallback command is issued
message.text = "/eat"
message.entities[0].length = len("/eat")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.THIRSTY
async def test_unknown_state_warning(self, app, bot, user1, recwarn):
def build_callback(state):
async def callback(_, __):
return state
return callback
handler = ConversationHandler(
entry_points=[CommandHandler("start", build_callback(1))],
states={
1: [TypeHandler(Update, build_callback(69))],
2: [TypeHandler(Update, build_callback(42))],
},
fallbacks=self.fallbacks,
name="xyz",
)
app.add_handler(handler)
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
async with app:
await app.process_update(Update(update_id=0, message=message))
try:
await app.process_update(Update(update_id=1, message=message))
except Exception as exc:
print(exc)
raise exc
assert len(recwarn) == 1
assert recwarn[0].category is PTBUserWarning
assert (
Path(recwarn[0].filename)
== PROJECT_ROOT_PATH / "telegram" / "ext" / "_handlers" / "conversationhandler.py"
), "wrong stacklevel!"
assert (
str(recwarn[0].message)
== "'callback' returned state 69 which is unknown to the ConversationHandler xyz."
)
async def test_conversation_handler_per_chat(self, app, bot, user1, user2):
handler = ConversationHandler(
entry_points=self.entry_points,
states=self.states,
fallbacks=self.fallbacks,
per_user=False,
)
app.add_handler(handler)
# User one, starts the state machine.
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
async with app:
await app.process_update(Update(update_id=0, message=message))
# The user is thirsty and wants to brew coffee.
message.text = "/brew"
message.entities[0].length = len("/brew")
await app.process_update(Update(update_id=0, message=message))
# Let's now verify that for another user, who did not start yet,
# the state will be changed because they are in the same group.
message.from_user = user2
message.text = "/pourCoffee"
message.entities[0].length = len("/pourCoffee")
await app.process_update(Update(update_id=0, message=message))
# Check that we're in the DRINKING state by checking that the corresponding command
# is accepted
message.from_user = user1
message.text = "/startCoding"
message.entities[0].length = len("/startCoding")
assert handler.check_update(Update(update_id=0, message=message))
message.from_user = user2
assert handler.check_update(Update(update_id=0, message=message))
async def test_conversation_handler_per_user(self, app, bot, user1):
handler = ConversationHandler(
entry_points=self.entry_points,
states=self.states,
fallbacks=self.fallbacks,
per_chat=False,
)
app.add_handler(handler)
# User one, starts the state machine.
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
# First check that updates without user won't be handled
message.from_user = None
assert not handler.check_update(Update(update_id=0, message=message))
message.from_user = user1
async with app:
await app.process_update(Update(update_id=0, message=message))
# The user is thirsty and wants to brew coffee.
message.text = "/brew"
message.entities[0].length = len("/brew")
await app.process_update(Update(update_id=0, message=message))
# Let's now verify that for the same user in a different group, the state will still be
# updated
message.chat = self.second_group
message.text = "/pourCoffee"
message.entities[0].length = len("/pourCoffee")
await app.process_update(Update(update_id=0, message=message))
# Check that we're in the DRINKING state by checking that the corresponding command
# is accepted
message.chat = self.group
message.text = "/startCoding"
message.entities[0].length = len("/startCoding")
assert handler.check_update(Update(update_id=0, message=message))
message.chat = self.second_group
assert handler.check_update(Update(update_id=0, message=message))
@pytest.mark.parametrize("inline", [True, False])
@pytest.mark.filterwarnings("ignore: If 'per_message=True' is used, 'per_chat=True'")
async def test_conversation_handler_per_message(self, app, bot, user1, user2, inline):
async def entry(update, context):
return 1
async def one(update, context):
return 2
async def two(update, context):
return ConversationHandler.END
handler = ConversationHandler(
entry_points=[CallbackQueryHandler(entry)],
states={
1: [CallbackQueryHandler(one, pattern="^1$")],
2: [CallbackQueryHandler(two, pattern="^2$")],
},
fallbacks=[],
per_message=True,
per_chat=not inline,
)
app.add_handler(handler)
# User one, starts the state machine.
message = (
Message(0, None, self.group, from_user=user1, text="msg w/ inlinekeyboard")
if not inline
else None
)
if message:
message.set_bot(bot)
message._unfreeze()
inline_message_id = "42" if inline else None
async with app:
cbq_1 = CallbackQuery(
0,
user1,
None,
message=message,
data="1",
inline_message_id=inline_message_id,
)
cbq_1.set_bot(bot)
cbq_2 = CallbackQuery(
0,
user1,
None,
message=message,
data="2",
inline_message_id=inline_message_id,
)
cbq_2.set_bot(bot)
cbq_2._unfreeze()
await app.process_update(Update(update_id=0, callback_query=cbq_1))
# Make sure that we're in the correct state
assert handler.check_update(Update(0, callback_query=cbq_1))
assert not handler.check_update(Update(0, callback_query=cbq_2))
await app.process_update(Update(update_id=0, callback_query=cbq_1))
# Make sure that we're in the correct state
assert not handler.check_update(Update(0, callback_query=cbq_1))
assert handler.check_update(Update(0, callback_query=cbq_2))
# Let's now verify that for a different user in the same group, the state will not be
# updated
cbq_2.from_user = user2
await app.process_update(Update(update_id=0, callback_query=cbq_2))
cbq_2.from_user = user1
assert not handler.check_update(Update(0, callback_query=cbq_1))
assert handler.check_update(Update(0, callback_query=cbq_2))
async def test_end_on_first_message(self, app, bot, user1):
handler = ConversationHandler(
entry_points=[CommandHandler("start", self.start_end)], states={}, fallbacks=[]
)
app.add_handler(handler)
# User starts the state machine and immediately ends it.
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
async with app:
await app.process_update(Update(update_id=0, message=message))
assert handler.check_update(Update(update_id=0, message=message))
async def test_end_on_first_message_non_blocking_handler(self, app, bot, user1):
handler = ConversationHandler(
entry_points=[CommandHandler("start", callback=self.start_end, block=False)],
states={},
fallbacks=[],
)
app.add_handler(handler)
# User starts the state machine with a non-blocking function that immediately ends the
# conversation. non-blocking results are resolved when the users state is queried next
# time.
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
async with app:
await app.process_update(Update(update_id=0, message=message))
# give the task a chance to finish
await asyncio.sleep(0.1)
# Let's check that processing the same update again is accepted. this confirms that
# a) the pending state is correctly resolved
# b) the conversation has ended
assert handler.check_update(Update(0, message=message))
async def test_none_on_first_message(self, app, bot, user1):
handler = ConversationHandler(
entry_points=[MessageHandler(filters.ALL, self.start_none)], states={}, fallbacks=[]
)
app.add_handler(handler)
# User starts the state machine and a callback function returns None
message = Message(0, None, self.group, from_user=user1, text="/start")
message.set_bot(bot)
message._unfreeze()
async with app:
await app.process_update(Update(update_id=0, message=message))
# Check that the same message is accepted again, i.e. the conversation immediately
# ended
assert handler.check_update(Update(0, message=message))
async def test_none_on_first_message_non_blocking_handler(self, app, bot, user1):
handler = ConversationHandler(
entry_points=[CommandHandler("start", self.start_none, block=False)],
states={},
fallbacks=[],
)
app.add_handler(handler)
# User starts the state machine with a non-blocking handler that returns None
# non-blocking results are resolved when the users state is queried next time.
message = Message(
0,
None,
self.group,
text="/start",
from_user=user1,
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
async with app:
await app.process_update(Update(update_id=0, message=message))
# Give the task a chance to finish
await asyncio.sleep(0.1)
# Let's check that processing the same update again is accepted. this confirms that
# a) the pending state is correctly resolved
# b) the conversation has ended
assert handler.check_update(Update(0, message=message))
async def test_per_chat_message_without_chat(self, bot, user1):
handler = ConversationHandler(
entry_points=[CommandHandler("start", self.start_end)], states={}, fallbacks=[]
)
cbq = CallbackQuery(0, user1, None, None)
cbq.set_bot(bot)
update = Update(0, callback_query=cbq)
assert not handler.check_update(update)
async def test_channel_message_without_chat(self, bot):
handler = ConversationHandler(
entry_points=[MessageHandler(filters.ALL, self.start_end)], states={}, fallbacks=[]
)
message = Message(0, date=None, chat=Chat(0, Chat.CHANNEL, "Misses Test"))
message.set_bot(bot)
message._unfreeze()
update = Update(0, channel_post=message)
assert not handler.check_update(update)
update = Update(0, edited_channel_post=message)
assert not handler.check_update(update)
async def test_all_update_types(self, app, bot, user1):
handler = ConversationHandler(
entry_points=[CommandHandler("start", self.start_end)], states={}, fallbacks=[]
)
message = Message(0, None, self.group, from_user=user1, text="ignore")
message.set_bot(bot)
message._unfreeze()
callback_query = CallbackQuery(0, user1, None, message=message, data="data")
callback_query.set_bot(bot)
chosen_inline_result = ChosenInlineResult(0, user1, "query")
chosen_inline_result.set_bot(bot)
inline_query = InlineQuery(0, user1, "query", offset="")
inline_query.set_bot(bot)
pre_checkout_query = PreCheckoutQuery(0, user1, "USD", 100, [])
pre_checkout_query.set_bot(bot)
shipping_query = ShippingQuery(0, user1, [], None)
shipping_query.set_bot(bot)
assert not handler.check_update(Update(0, callback_query=callback_query))
assert not handler.check_update(Update(0, chosen_inline_result=chosen_inline_result))
assert not handler.check_update(Update(0, inline_query=inline_query))
assert not handler.check_update(Update(0, message=message))
assert not handler.check_update(Update(0, pre_checkout_query=pre_checkout_query))
assert not handler.check_update(Update(0, shipping_query=shipping_query))
@pytest.mark.parametrize("jq", [True, False])
async def test_no_running_job_queue_warning(self, app, bot, user1, recwarn, jq):
handler = ConversationHandler(
entry_points=self.entry_points,
states=self.states,
fallbacks=self.fallbacks,
conversation_timeout=0.5,
)
if not jq:
app = ApplicationBuilder().token(bot.token).job_queue(None).build()
app.add_handler(handler)
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
async with app:
await app.process_update(Update(update_id=0, message=message))
await asyncio.sleep(0.5)
if jq:
assert len(recwarn) == 1
else:
assert len(recwarn) == 2
assert str(recwarn[0].message if jq else recwarn[1].message).startswith(
"Ignoring `conversation_timeout`"
)
assert ("is not running" if jq else "No `JobQueue` set up.") in str(recwarn[0].message)
for warning in recwarn:
assert warning.category is PTBUserWarning
assert (
Path(warning.filename)
== PROJECT_ROOT_PATH
/ "telegram"
/ "ext"
/ "_handlers"
/ "conversationhandler.py"
), "wrong stacklevel!"
# now set app.job_queue back to it's original value
async def test_schedule_job_exception(self, app, bot, user1, monkeypatch, caplog):
def mocked_run_once(*a, **kw):
raise Exception("job error")
class DictJB(JobQueue):
pass
app = ApplicationBuilder().token(bot.token).job_queue(DictJB()).build()
monkeypatch.setattr(app.job_queue, "run_once", mocked_run_once)
handler = ConversationHandler(
entry_points=self.entry_points,
states=self.states,
fallbacks=self.fallbacks,
conversation_timeout=100,
)
app.add_handler(handler)
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
async with app:
await app.start()
with caplog.at_level(logging.ERROR):
await app.process_update(Update(update_id=0, message=message))
await asyncio.sleep(0.5)
assert len(caplog.records) == 1
assert caplog.records[0].message == "Failed to schedule timeout."
assert caplog.records[0].name == "telegram.ext.ConversationHandler"
assert str(caplog.records[0].exc_info[1]) == "job error"
await app.stop()
@pytest.mark.parametrize(argnames="test_type", argvalues=["none", "exception"])
async def test_non_blocking_exception_or_none(self, app, bot, user1, caplog, test_type):
"""Here we make sure that when a non-blocking handler raises an
exception or returns None, the state isn't changed.
"""
error = Exception("task exception")
async def conv_entry(*a, **kw):
return 1
async def raise_error(*a, **kw):
if test_type == "none":
return
raise error
handler = ConversationHandler(
entry_points=[CommandHandler("start", conv_entry)],
states={1: [MessageHandler(filters.Text(["error"]), raise_error)]},
fallbacks=self.fallbacks,
block=False,
)
app.add_handler(handler)
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
# start the conversation
async with app:
await app.process_update(Update(update_id=0, message=message))
await asyncio.sleep(0.1)
message.text = "error"
await app.process_update(Update(update_id=0, message=message))
await asyncio.sleep(0.1)
caplog.clear()
with caplog.at_level(logging.ERROR):
# This also makes sure that we're still in the same state
assert handler.check_update(Update(0, message=message))
if test_type == "exception":
assert len(caplog.records) == 1
assert caplog.records[0].name == "telegram.ext.ConversationHandler"
assert (
caplog.records[0].message
== "Task function raised exception. Falling back to old state 1"
)
assert caplog.records[0].exc_info[1] is None
else:
assert len(caplog.records) == 0
async def test_non_blocking_entry_point_exception(self, app, bot, user1, caplog):
"""Here we make sure that when a non-blocking entry point raises an
exception, the state isn't changed.
"""
error = Exception("task exception")
async def raise_error(*a, **kw):
raise error
handler = ConversationHandler(
entry_points=[CommandHandler("start", raise_error, block=False)],
states={},
fallbacks=self.fallbacks,
)
app.add_handler(handler)
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
# start the conversation
async with app:
await app.process_update(Update(update_id=0, message=message))
await asyncio.sleep(0.1)
caplog.clear()
with caplog.at_level(logging.ERROR):
# This also makes sure that we're still in the same state
assert handler.check_update(Update(0, message=message))
assert len(caplog.records) == 1
assert caplog.records[0].name == "telegram.ext.ConversationHandler"
assert (
caplog.records[0].message
== "Task function raised exception. Falling back to old state None"
)
assert caplog.records[0].exc_info[1] is None
async def test_conversation_timeout(self, app, bot, user1):
handler = ConversationHandler(
entry_points=self.entry_points,
states=self.states,
fallbacks=self.fallbacks,
conversation_timeout=0.5,
)
app.add_handler(handler)
# Start state machine, then reach timeout
start_message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
start_message.set_bot(bot)
brew_message = Message(
0,
None,
self.group,
from_user=user1,
text="/brew",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/brew"))
],
)
brew_message.set_bot(bot)
pour_coffee_message = Message(
0,
None,
self.group,
from_user=user1,
text="/pourCoffee",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/pourCoffee"))
],
)
pour_coffee_message.set_bot(bot)
async with app:
await app.start()
await app.process_update(Update(update_id=0, message=start_message))
assert handler.check_update(Update(0, message=brew_message))
await asyncio.sleep(0.75)
assert handler.check_update(Update(0, message=start_message))
# Start state machine, do something, then reach timeout
await app.process_update(Update(update_id=1, message=start_message))
assert handler.check_update(Update(0, message=brew_message))
# assert handler.conversations.get((self.group.id, user1.id)) == self.THIRSTY
# start_message.text = '/brew'
# start_message.entities[0].length = len('/brew')
await app.process_update(Update(update_id=2, message=brew_message))
assert handler.check_update(Update(0, message=pour_coffee_message))
# assert handler.conversations.get((self.group.id, user1.id)) == self.BREWING
await asyncio.sleep(0.75)
assert handler.check_update(Update(0, message=start_message))
# assert handler.conversations.get((self.group.id, user1.id)) is None
await app.stop()
async def test_timeout_not_triggered_on_conv_end_non_blocking(self, bot, app, user1):
def timeout(*a, **kw):
self.test_flag = True
self.states.update({ConversationHandler.TIMEOUT: [TypeHandler(Update, timeout)]})
handler = ConversationHandler(
entry_points=self.entry_points,
states=self.states,
fallbacks=self.fallbacks,
conversation_timeout=0.5,
block=False,
)
app.add_handler(handler)
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
async with app:
# start the conversation
await app.process_update(Update(update_id=0, message=message))
await asyncio.sleep(0.1)
message.text = "/brew"
message.entities[0].length = len("/brew")
await app.process_update(Update(update_id=1, message=message))
await asyncio.sleep(0.1)
message.text = "/pourCoffee"
message.entities[0].length = len("/pourCoffee")
await app.process_update(Update(update_id=2, message=message))
await asyncio.sleep(0.1)
message.text = "/end"
message.entities[0].length = len("/end")
await app.process_update(Update(update_id=3, message=message))
await asyncio.sleep(1)
# assert timeout handler didn't get called
assert self.test_flag is False
async def test_conversation_timeout_application_handler_stop(self, app, bot, user1, recwarn):
handler = ConversationHandler(
entry_points=self.entry_points,
states=self.states,
fallbacks=self.fallbacks,
conversation_timeout=0.5,
)
def timeout(*args, **kwargs):
raise ApplicationHandlerStop
self.states.update({ConversationHandler.TIMEOUT: [TypeHandler(Update, timeout)]})
app.add_handler(handler)
# Start state machine, then reach timeout
message = Message(
0,
None,
self.group,
text="/start",
from_user=user1,
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
brew_message = Message(
0,
None,
self.group,
from_user=user1,
text="/brew",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/brew"))
],
)
brew_message.set_bot(bot)
async with app:
await app.start()
await app.process_update(Update(update_id=0, message=message))
# Make sure that we're in the next state
assert handler.check_update(Update(0, message=brew_message))
await app.process_update(Update(0, message=brew_message))
await asyncio.sleep(0.9)
# Check that conversation has ended by checking that the start messages is accepted
# again
assert handler.check_update(Update(0, message=message))
assert len(recwarn) == 1
assert str(recwarn[0].message).startswith("ApplicationHandlerStop in TIMEOUT")
assert recwarn[0].category is PTBUserWarning
assert (
Path(recwarn[0].filename)
== PROJECT_ROOT_PATH / "telegram" / "ext" / "_jobqueue.py"
), "wrong stacklevel!"
await app.stop()
async def test_conversation_handler_timeout_update_and_context(self, app, bot, user1):
context = None
async def start_callback(u, c):
nonlocal context, self
context = c
return await self.start(u, c)
# Start state machine, then reach timeout
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
update = Update(update_id=0, message=message)
async def timeout_callback(u, c):
nonlocal update, context
assert u is update
assert c is context
self.is_timeout = (u is update) and (c is context)
states = self.states
timeout_handler = CommandHandler("start", timeout_callback)
states.update({ConversationHandler.TIMEOUT: [timeout_handler]})
handler = ConversationHandler(
entry_points=[CommandHandler("start", start_callback)],
states=states,
fallbacks=self.fallbacks,
conversation_timeout=0.5,
)
app.add_handler(handler)
async with app:
await app.start()
await app.process_update(update)
await asyncio.sleep(0.9)
# check that the conversation has ended by checking that the start message is accepted
assert handler.check_update(Update(0, message=message))
assert self.is_timeout
await app.stop()
@pytest.mark.flaky(3, 1)
async def test_conversation_timeout_keeps_extending(self, app, bot, user1):
handler = ConversationHandler(
entry_points=self.entry_points,
states=self.states,
fallbacks=self.fallbacks,
conversation_timeout=0.5,
)
app.add_handler(handler)
# Start state machine, wait, do something, verify the timeout is extended.
# t=0 /start (timeout=.5)
# t=.35 /brew (timeout=.85)
# t=.5 original timeout
# t=.6 /pourCoffee (timeout=1.1)
# t=.85 second timeout
# t=1.1 actual timeout
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
async with app:
await app.start()
await app.process_update(Update(update_id=0, message=message))
message.text = "/brew"
message.entities[0].length = len("/brew")
assert handler.check_update(Update(0, message=message))
await asyncio.sleep(0.35) # t=.35
assert handler.check_update(Update(0, message=message))
await app.process_update(Update(update_id=0, message=message))
message.text = "/pourCoffee"
message.entities[0].length = len("/pourCoffee")
assert handler.check_update(Update(0, message=message))
await asyncio.sleep(0.25) # t=.6
assert handler.check_update(Update(0, message=message))
await app.process_update(Update(update_id=0, message=message))
message.text = "/startCoding"
message.entities[0].length = len("/startCoding")
assert handler.check_update(Update(0, message=message))
await asyncio.sleep(0.4) # t=1.0
assert handler.check_update(Update(0, message=message))
await asyncio.sleep(0.3) # t=1.3
assert not handler.check_update(Update(0, message=message))
message.text = "/start"
message.entities[0].length = len("/start")
assert handler.check_update(Update(0, message=message))
await app.stop()
async def test_conversation_timeout_two_users(self, app, bot, user1, user2):
handler = ConversationHandler(
entry_points=self.entry_points,
states=self.states,
fallbacks=self.fallbacks,
conversation_timeout=0.5,
)
app.add_handler(handler)
# Start state machine, do something as second user, then reach timeout
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
async with app:
await app.start()
await app.process_update(Update(update_id=0, message=message))
message.text = "/brew"
message.entities[0].length = len("/brew")
assert handler.check_update(Update(0, message=message))
message.from_user = user2
await app.process_update(Update(update_id=0, message=message))
message.text = "/start"
message.entities[0].length = len("/start")
# Make sure that user2s conversation has not yet started
assert handler.check_update(Update(0, message=message))
await app.process_update(Update(update_id=0, message=message))
message.text = "/brew"
message.entities[0].length = len("/brew")
assert handler.check_update(Update(0, message=message))
await asyncio.sleep(0.7)
# check that both conversations have ended by checking that the start message is
# accepted again
message.text = "/start"
message.entities[0].length = len("/start")
message.from_user = user1
assert handler.check_update(Update(0, message=message))
message.from_user = user2
assert handler.check_update(Update(0, message=message))
await app.stop()
async def test_conversation_handler_timeout_state(self, app, bot, user1):
states = self.states
states.update(
{
ConversationHandler.TIMEOUT: [
CommandHandler("brew", self.passout),
MessageHandler(~filters.Regex("oding"), self.passout2),
]
}
)
handler = ConversationHandler(
entry_points=self.entry_points,
states=states,
fallbacks=self.fallbacks,
conversation_timeout=0.5,
)
app.add_handler(handler)
# CommandHandler timeout
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
async with app:
await app.start()
await app.process_update(Update(update_id=0, message=message))
message.text = "/brew"
message.entities[0].length = len("/brew")
await app.process_update(Update(update_id=0, message=message))
await asyncio.sleep(0.7)
# check that conversation has ended by checking that start cmd is accepted again
message.text = "/start"
message.entities[0].length = len("/start")
assert handler.check_update(Update(0, message=message))
assert self.is_timeout
# MessageHandler timeout
self.is_timeout = False
message.text = "/start"
message.entities[0].length = len("/start")
await app.process_update(Update(update_id=1, message=message))
await asyncio.sleep(0.7)
# check that conversation has ended by checking that start cmd is accepted again
assert handler.check_update(Update(0, message=message))
assert self.is_timeout
# Timeout but no valid handler
self.is_timeout = False
await app.process_update(Update(update_id=0, message=message))
message.text = "/brew"
message.entities[0].length = len("/brew")
await app.process_update(Update(update_id=0, message=message))
message.text = "/startCoding"
message.entities[0].length = len("/startCoding")
await app.process_update(Update(update_id=0, message=message))
await asyncio.sleep(0.7)
# check that conversation has ended by checking that start cmd is accepted again
message.text = "/start"
message.entities[0].length = len("/start")
assert handler.check_update(Update(0, message=message))
assert not self.is_timeout
await app.stop()
async def test_conversation_handler_timeout_state_context(self, app, bot, user1):
states = self.states
states.update(
{
ConversationHandler.TIMEOUT: [
CommandHandler("brew", self.passout_context),
MessageHandler(~filters.Regex("oding"), self.passout2_context),
]
}
)
handler = ConversationHandler(
entry_points=self.entry_points,
states=states,
fallbacks=self.fallbacks,
conversation_timeout=0.5,
)
app.add_handler(handler)
# CommandHandler timeout
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
async with app:
await app.start()
await app.process_update(Update(update_id=0, message=message))
message.text = "/brew"
message.entities[0].length = len("/brew")
await app.process_update(Update(update_id=0, message=message))
await asyncio.sleep(0.7)
# check that conversation has ended by checking that start cmd is accepted again
message.text = "/start"
message.entities[0].length = len("/start")
assert handler.check_update(Update(0, message=message))
assert self.is_timeout
# MessageHandler timeout
self.is_timeout = False
message.text = "/start"
message.entities[0].length = len("/start")
await app.process_update(Update(update_id=1, message=message))
await asyncio.sleep(0.7)
# check that conversation has ended by checking that start cmd is accepted again
assert handler.check_update(Update(0, message=message))
assert self.is_timeout
# Timeout but no valid handler
self.is_timeout = False
await app.process_update(Update(update_id=0, message=message))
message.text = "/brew"
message.entities[0].length = len("/brew")
await app.process_update(Update(update_id=0, message=message))
message.text = "/startCoding"
message.entities[0].length = len("/startCoding")
await app.process_update(Update(update_id=0, message=message))
await asyncio.sleep(0.7)
# check that conversation has ended by checking that start cmd is accepted again
message.text = "/start"
message.entities[0].length = len("/start")
assert handler.check_update(Update(0, message=message))
assert not self.is_timeout
await app.stop()
async def test_conversation_timeout_cancel_conflict(self, app, bot, user1):
# Start state machine, wait half the timeout,
# then call a callback that takes more than the timeout
# t=0 /start (timeout=.5)
# t=.25 /slowbrew (sleep .5)
# | t=.5 original timeout (should not execute)
# | t=.75 /slowbrew returns (timeout=1.25)
# t=1.25 timeout
async def slowbrew(_update, context):
await asyncio.sleep(0.25)
# Let's give to the original timeout a chance to execute
await asyncio.sleep(0.25)
# By returning None we do not override the conversation state so
# we can see if the timeout has been executed
states = self.states
states[self.THIRSTY].append(CommandHandler("slowbrew", slowbrew))
states.update({ConversationHandler.TIMEOUT: [MessageHandler(None, self.passout2)]})
handler = ConversationHandler(
entry_points=self.entry_points,
states=states,
fallbacks=self.fallbacks,
conversation_timeout=0.5,
)
app.add_handler(handler)
# CommandHandler timeout
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
async with app:
await app.start()
await app.process_update(Update(update_id=0, message=message))
await asyncio.sleep(0.25)
message.text = "/slowbrew"
message.entities[0].length = len("/slowbrew")
await app.process_update(Update(update_id=0, message=message))
# Check that conversation has not ended by checking that start cmd is not accepted
message.text = "/start"
message.entities[0].length = len("/start")
assert not handler.check_update(Update(0, message=message))
assert not self.is_timeout
await asyncio.sleep(0.7)
# Check that conversation has ended by checking that start cmd is accepted again
message.text = "/start"
message.entities[0].length = len("/start")
assert handler.check_update(Update(0, message=message))
assert self.is_timeout
await app.stop()
async def test_nested_conversation_handler(self, app, bot, user1, user2):
self.nested_states[self.DRINKING] = [
ConversationHandler(
entry_points=self.drinking_entry_points,
states=self.drinking_states,
fallbacks=self.drinking_fallbacks,
map_to_parent=self.drinking_map_to_parent,
)
]
handler = ConversationHandler(
entry_points=self.entry_points, states=self.nested_states, fallbacks=self.fallbacks
)
app.add_handler(handler)
# User one, starts the state machine.
message = Message(
0,
None,
self.group,
from_user=user1,
text="/start",
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
async with app:
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.THIRSTY
# The user is thirsty and wants to brew coffee.
message.text = "/brew"
message.entities[0].length = len("/brew")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.BREWING
# Lets pour some coffee.
message.text = "/pourCoffee"
message.entities[0].length = len("/pourCoffee")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.DRINKING
# The user is holding the cup
message.text = "/hold"
message.entities[0].length = len("/hold")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.HOLDING
# The user is sipping coffee
message.text = "/sip"
message.entities[0].length = len("/sip")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.SIPPING
# The user is swallowing
message.text = "/swallow"
message.entities[0].length = len("/swallow")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.SWALLOWING
# The user is holding the cup again
message.text = "/hold"
message.entities[0].length = len("/hold")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.HOLDING
# The user wants to replenish the coffee supply
message.text = "/replenish"
message.entities[0].length = len("/replenish")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.REPLENISHING
# check that we're in the right state now by checking that the update is accepted
message.text = "/pourCoffee"
message.entities[0].length = len("/pourCoffee")
assert handler.check_update(Update(0, message=message))
# The user wants to drink their coffee again)
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.DRINKING
# The user is now ready to start coding
message.text = "/startCoding"
message.entities[0].length = len("/startCoding")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.CODING
# The user decides it's time to drink again
message.text = "/drinkMore"
message.entities[0].length = len("/drinkMore")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.DRINKING
# The user is holding their cup
message.text = "/hold"
message.entities[0].length = len("/hold")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.HOLDING
# The user wants to end with the drinking and go back to coding
message.text = "/end"
message.entities[0].length = len("/end")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.END
# check that we're in the right state now by checking that the update is accepted
message.text = "/drinkMore"
message.entities[0].length = len("/drinkMore")
assert handler.check_update(Update(0, message=message))
# The user wants to drink once more
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.DRINKING
# The user wants to stop altogether
message.text = "/stop"
message.entities[0].length = len("/stop")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.STOPPING
# check that the conversation has ended by checking that the start cmd is accepted
message.text = "/start"
message.entities[0].length = len("/start")
assert handler.check_update(Update(0, message=message))
async def test_nested_conversation_application_handler_stop(self, app, bot, user1, user2):
self.nested_states[self.DRINKING] = [
ConversationHandler(
entry_points=self.drinking_entry_points,
states=self.drinking_states,
fallbacks=self.drinking_fallbacks,
map_to_parent=self.drinking_map_to_parent,
)
]
handler = ConversationHandler(
entry_points=self.entry_points, states=self.nested_states, fallbacks=self.fallbacks
)
def test_callback(u, c):
self.test_flag = True
app.add_handler(handler)
app.add_handler(TypeHandler(Update, test_callback), group=1)
self.raise_app_handler_stop = True
# User one, starts the state machine.
message = Message(
0,
None,
self.group,
text="/start",
from_user=user1,
entities=[
MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len("/start"))
],
)
message.set_bot(bot)
message._unfreeze()
message.entities[0]._unfreeze()
async with app:
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.THIRSTY
assert not self.test_flag
# The user is thirsty and wants to brew coffee.
message.text = "/brew"
message.entities[0].length = len("/brew")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.BREWING
assert not self.test_flag
# Lets pour some coffee.
message.text = "/pourCoffee"
message.entities[0].length = len("/pourCoffee")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.DRINKING
assert not self.test_flag
# The user is holding the cup
message.text = "/hold"
message.entities[0].length = len("/hold")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.HOLDING
assert not self.test_flag
# The user is sipping coffee
message.text = "/sip"
message.entities[0].length = len("/sip")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.SIPPING
assert not self.test_flag
# The user is swallowing
message.text = "/swallow"
message.entities[0].length = len("/swallow")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.SWALLOWING
assert not self.test_flag
# The user is holding the cup again
message.text = "/hold"
message.entities[0].length = len("/hold")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.HOLDING
assert not self.test_flag
# The user wants to replenish the coffee supply
message.text = "/replenish"
message.entities[0].length = len("/replenish")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.REPLENISHING
# check that we're in the right state now by checking that the update is accepted
message.text = "/pourCoffee"
message.entities[0].length = len("/pourCoffee")
assert handler.check_update(Update(0, message=message))
assert not self.test_flag
# The user wants to drink their coffee again
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.DRINKING
assert not self.test_flag
# The user is now ready to start coding
message.text = "/startCoding"
message.entities[0].length = len("/startCoding")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.CODING
assert not self.test_flag
# The user decides it's time to drink again
message.text = "/drinkMore"
message.entities[0].length = len("/drinkMore")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.DRINKING
assert not self.test_flag
# The user is holding their cup
message.text = "/hold"
message.entities[0].length = len("/hold")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.HOLDING
assert not self.test_flag
# The user wants to end with the drinking and go back to coding
message.text = "/end"
message.entities[0].length = len("/end")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.END
# check that we're in the right state now by checking that the update is accepted
message.text = "/drinkMore"
message.entities[0].length = len("/drinkMore")
assert handler.check_update(Update(0, message=message))
assert not self.test_flag
# The user wants to drink once more
message.text = "/drinkMore"
message.entities[0].length = len("/drinkMore")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.DRINKING
assert not self.test_flag
# The user wants to stop altogether
message.text = "/stop"
message.entities[0].length = len("/stop")
await app.process_update(Update(update_id=0, message=message))
assert self.current_state[user1.id] == self.STOPPING
# check that the conv has ended by checking that the start cmd is accepted
message.text = "/start"
message.entities[0].length = len("/start")
assert handler.check_update(Update(0, message=message))
assert not self.test_flag
@pytest.mark.parametrize("callback_raises", [True, False])
async def test_timeout_non_block(self, app, user1, callback_raises):
event = asyncio.Event()
async def callback(_, __):
await event.wait()
if callback_raises:
raise RuntimeError
return 1
conv_handler = ConversationHandler(
entry_points=[MessageHandler(filters.ALL, callback=callback, block=False)],
states={ConversationHandler.TIMEOUT: [TypeHandler(Update, self.passout2)]},
fallbacks=[],
conversation_timeout=0.5,
)
app.add_handler(conv_handler)
async with app:
await app.start()
message = Message(
0,
None,
self.group,
text="/start",
from_user=user1,
)
assert conv_handler.check_update(Update(0, message=message))
await app.process_update(Update(0, message=message))
await asyncio.sleep(0.7)
tasks = asyncio.all_tasks()
assert any(":handle_update:non_blocking_cb" in t.get_name() for t in tasks)
assert any(":handle_update:timeout_job" in t.get_name() for t in tasks)
assert not self.is_timeout
event.set()
await asyncio.sleep(0.7)
assert self.is_timeout == (not callback_raises)
await app.stop()
async def test_no_timeout_on_end(self, app, user1):
conv_handler = ConversationHandler(
entry_points=[MessageHandler(filters.ALL, callback=self.start_end)],
states={ConversationHandler.TIMEOUT: [TypeHandler(Update, self.passout2)]},
fallbacks=[],
conversation_timeout=0.5,
)
app.add_handler(conv_handler)
async with app:
await app.start()
message = Message(
0,
None,
self.group,
text="/start",
from_user=user1,
)
assert conv_handler.check_update(Update(0, message=message))
await app.process_update(Update(0, message=message))
await asyncio.sleep(0.7)
assert not self.is_timeout
await app.stop()
async def test_conversation_handler_block_dont_override(self, app):
"""This just makes sure that we don't change any attributes of the handlers of the conv"""
conv_handler = ConversationHandler(
entry_points=self.entry_points,
states=self.states,
fallbacks=self.fallbacks,
block=False,
)
all_handlers = conv_handler.entry_points + conv_handler.fallbacks
for state_handlers in conv_handler.states.values():
all_handlers += state_handlers
for handler in all_handlers:
assert handler.block
conv_handler = ConversationHandler(
entry_points=[CommandHandler("start", self.start_end, block=False)],
states={1: [CommandHandler("start", self.start_end, block=False)]},
fallbacks=[CommandHandler("start", self.start_end, block=False)],
block=True,
)
all_handlers = conv_handler.entry_points + conv_handler.fallbacks
for state_handlers in conv_handler.states.values():
all_handlers += state_handlers
for handler in all_handlers:
assert handler.block is False
@pytest.mark.parametrize("default_block", [True, False, None])
@pytest.mark.parametrize("ch_block", [True, False, None])
@pytest.mark.parametrize("handler_block", [True, False, None])
@pytest.mark.parametrize("ext_bot", [True, False], ids=["ExtBot", "Bot"])
async def test_blocking_resolution_order(
self, bot_info, default_block, ch_block, handler_block, ext_bot
):
event = asyncio.Event()
async def callback(_, __):
await event.wait()
event.clear()
self.test_flag = True
return 1
if handler_block is not None:
handler = CommandHandler("start", callback=callback, block=handler_block)
fallback = MessageHandler(filters.ALL, callback, block=handler_block)
else:
handler = CommandHandler("start", callback=callback)
fallback = MessageHandler(filters.ALL, callback, block=handler_block)
defaults = Defaults(block=default_block) if default_block is not None else None
if ch_block is not None:
conv_handler = ConversationHandler(
entry_points=[handler],
states={1: [handler]},
fallbacks=[fallback],
block=ch_block,
)
else:
conv_handler = ConversationHandler(
entry_points=[handler],
states={1: [handler]},
fallbacks=[fallback],
)
bot = make_bot(bot_info, defaults=defaults) if ext_bot else PytestBot(bot_info["token"])
app = ApplicationBuilder().bot(bot).build()
app.add_handler(conv_handler)
async with app:
start_message = make_command_message("/start")
start_message.set_bot(bot)
fallback_message = make_command_message("/fallback")
fallback_message.set_bot(bot)
# This loop makes sure that we test all of entry points, states handler & fallbacks
for message in [start_message, fallback_message]:
process_update_task = asyncio.create_task(
app.process_update(Update(0, message=message))
)
if (
# resolution order is handler_block -> ch_block -> default_block
# setting block=True/False on a lower priority setting may only have an effect
# if it wasn't set for the higher priority settings
(handler_block is False)
or ((handler_block is None) and (ch_block is False))
or (
(handler_block is None)
and (ch_block is None)
and ext_bot
and (default_block is False)
)
):
# check that the handler was called non-blocking by checking that
# `process_update` has finished
await asyncio.sleep(0.01)
assert process_update_task.done()
else:
# the opposite
assert not process_update_task.done()
# In any case, the callback must not have finished
assert not self.test_flag
# After setting the event, the callback must have finished and in the blocking
# case this leads to `process_update` finishing.
event.set()
await asyncio.sleep(0.01)
assert process_update_task.done()
assert self.test_flag
self.test_flag = False
async def test_waiting_state(self, app, user1):
event = asyncio.Event()
async def callback_1(_, __):
self.test_flag = 1
async def callback_2(_, __):
self.test_flag = 2
async def callback_3(_, __):
self.test_flag = 3
async def blocking(_, __):
await event.wait()
return 1
conv_handler = ConversationHandler(
entry_points=[MessageHandler(filters.ALL, callback=blocking, block=False)],
states={
ConversationHandler.WAITING: [
MessageHandler(filters.Regex("1"), callback_1),
MessageHandler(filters.Regex("2"), callback_2),
],
1: [MessageHandler(filters.Regex("2"), callback_3)],
},
fallbacks=[],
)
app.add_handler(conv_handler)
message = Message(
0,
None,
self.group,
text="/start",
from_user=user1,
)
message._unfreeze()
async with app:
await app.process_update(Update(0, message=message))
assert not self.test_flag
message.text = "1"
await app.process_update(Update(0, message=message))
assert self.test_flag == 1
message.text = "2"
await app.process_update(Update(0, message=message))
assert self.test_flag == 2
event.set()
await asyncio.sleep(0.05)
self.test_flag = None
await app.process_update(Update(0, message=message))
assert self.test_flag == 3