mirror of
https://github.com/python-telegram-bot/python-telegram-bot.git
synced 2025-01-05 10:24:48 +01:00
973 lines
42 KiB
Python
973 lines
42 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# A library that provides a Python interface to the Telegram Bot API
|
|
# Copyright (C) 2015-2024
|
|
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Lesser Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Lesser Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser Public License
|
|
# along with this program. If not, see [http://www.gnu.org/licenses/].
|
|
"""This module contains the ConversationHandler."""
|
|
import asyncio
|
|
import datetime
|
|
from dataclasses import dataclass
|
|
from typing import (
|
|
TYPE_CHECKING,
|
|
Any,
|
|
Dict,
|
|
Final,
|
|
Generic,
|
|
List,
|
|
NoReturn,
|
|
Optional,
|
|
Set,
|
|
Tuple,
|
|
Union,
|
|
cast,
|
|
)
|
|
|
|
from telegram import Update
|
|
from telegram._utils.defaultvalue import DEFAULT_TRUE, DefaultValue
|
|
from telegram._utils.logging import get_logger
|
|
from telegram._utils.repr import build_repr_with_selected_attrs
|
|
from telegram._utils.types import DVType
|
|
from telegram._utils.warnings import warn
|
|
from telegram.ext._application import ApplicationHandlerStop
|
|
from telegram.ext._extbot import ExtBot
|
|
from telegram.ext._handlers.basehandler import BaseHandler
|
|
from telegram.ext._handlers.callbackqueryhandler import CallbackQueryHandler
|
|
from telegram.ext._handlers.choseninlineresulthandler import ChosenInlineResultHandler
|
|
from telegram.ext._handlers.inlinequeryhandler import InlineQueryHandler
|
|
from telegram.ext._handlers.stringcommandhandler import StringCommandHandler
|
|
from telegram.ext._handlers.stringregexhandler import StringRegexHandler
|
|
from telegram.ext._handlers.typehandler import TypeHandler
|
|
from telegram.ext._utils.trackingdict import TrackingDict
|
|
from telegram.ext._utils.types import CCT, ConversationDict, ConversationKey
|
|
|
|
if TYPE_CHECKING:
|
|
from telegram.ext import Application, Job, JobQueue
|
|
_CheckUpdateType = Tuple[object, ConversationKey, BaseHandler[Update, CCT], object]
|
|
|
|
_LOGGER = get_logger(__name__, class_name="ConversationHandler")
|
|
|
|
|
|
@dataclass
|
|
class _ConversationTimeoutContext(Generic[CCT]):
|
|
"""Used as a datastore for conversation timeouts. Passed in the
|
|
:paramref:`JobQueue.run_once.data` parameter. See :meth:`_trigger_timeout`.
|
|
"""
|
|
|
|
__slots__ = ("application", "callback_context", "conversation_key", "update")
|
|
|
|
conversation_key: ConversationKey
|
|
update: Update
|
|
application: "Application[Any, CCT, Any, Any, Any, JobQueue]"
|
|
callback_context: CCT
|
|
|
|
|
|
@dataclass
|
|
class PendingState:
|
|
"""Thin wrapper around :class:`asyncio.Task` to handle block=False handlers. Note that this is
|
|
a public class of this module, since :meth:`Application.update_persistence` needs to access it.
|
|
It's still hidden from users, since this module itself is private.
|
|
"""
|
|
|
|
__slots__ = ("old_state", "task")
|
|
|
|
task: asyncio.Task
|
|
old_state: object
|
|
|
|
def done(self) -> bool:
|
|
return self.task.done()
|
|
|
|
def resolve(self) -> object:
|
|
"""Returns the new state of the :class:`ConversationHandler` if available. If there was an
|
|
exception during the task execution, then return the old state. If both the new and old
|
|
state are :obj:`None`, return `CH.END`. If only the new state is :obj:`None`, return the
|
|
old state.
|
|
|
|
Raises:
|
|
:exc:`RuntimeError`: If the current task has not yet finished.
|
|
"""
|
|
if not self.task.done():
|
|
raise RuntimeError("New state is not yet available")
|
|
|
|
exc = self.task.exception()
|
|
if exc:
|
|
_LOGGER.exception(
|
|
"Task function raised exception. Falling back to old state %s",
|
|
self.old_state,
|
|
)
|
|
return self.old_state
|
|
|
|
res = self.task.result()
|
|
if res is None and self.old_state is None:
|
|
res = ConversationHandler.END
|
|
elif res is None:
|
|
# returning None from a callback means that we want to stay in the old state
|
|
return self.old_state
|
|
|
|
return res
|
|
|
|
|
|
class ConversationHandler(BaseHandler[Update, CCT]):
|
|
"""
|
|
A handler to hold a conversation with a single or multiple users through Telegram updates by
|
|
managing three collections of other handlers.
|
|
|
|
Warning:
|
|
:class:`ConversationHandler` heavily relies on incoming updates being processed one by one.
|
|
When using this handler, :attr:`telegram.ext.ApplicationBuilder.concurrent_updates` should
|
|
be set to :obj:`False`.
|
|
|
|
Note:
|
|
:class:`ConversationHandler` will only accept updates that are (subclass-)instances of
|
|
:class:`telegram.Update`. This is, because depending on the :attr:`per_user` and
|
|
:attr:`per_chat`, :class:`ConversationHandler` relies on
|
|
:attr:`telegram.Update.effective_user` and/or :attr:`telegram.Update.effective_chat` in
|
|
order to determine which conversation an update should belong to. For
|
|
:attr:`per_message=True <per_message>`, :class:`ConversationHandler` uses
|
|
:attr:`update.callback_query.message.message_id <telegram.Message.message_id>` when
|
|
:attr:`per_chat=True <per_chat>` and
|
|
:attr:`update.callback_query.inline_message_id <.CallbackQuery.inline_message_id>` when
|
|
:attr:`per_chat=False <per_chat>`. For a more detailed explanation, please see our `FAQ`_.
|
|
|
|
Finally, :class:`ConversationHandler`, does *not* handle (edited) channel posts.
|
|
|
|
.. _`FAQ`: https://github.com/python-telegram-bot/python-telegram-bot/wiki\
|
|
/Frequently-Asked-Questions#what-do-the-per_-settings-in-conversation handler-do
|
|
|
|
The first collection, a :obj:`list` named :attr:`entry_points`, is used to initiate the
|
|
conversation, for example with a :class:`telegram.ext.CommandHandler` or
|
|
:class:`telegram.ext.MessageHandler`.
|
|
|
|
The second collection, a :obj:`dict` named :attr:`states`, contains the different conversation
|
|
steps and one or more associated handlers that should be used if the user sends a message when
|
|
the conversation with them is currently in that state. Here you can also define a state for
|
|
:attr:`TIMEOUT` to define the behavior when :attr:`conversation_timeout` is exceeded, and a
|
|
state for :attr:`WAITING` to define behavior when a new update is received while the previous
|
|
:attr:`block=False <block>` handler is not finished.
|
|
|
|
The third collection, a :obj:`list` named :attr:`fallbacks`, is used if the user is currently
|
|
in a conversation but the state has either no associated handler or the handler that is
|
|
associated to the state is inappropriate for the update, for example if the update contains a
|
|
command, but a regular text message is expected. You could use this for a ``/cancel`` command
|
|
or to let the user know their message was not recognized.
|
|
|
|
To change the state of conversation, the callback function of a handler must return the new
|
|
state after responding to the user. If it does not return anything (returning :obj:`None` by
|
|
default), the state will not change. If an entry point callback function returns :obj:`None`,
|
|
the conversation ends immediately after the execution of this callback function.
|
|
To end the conversation, the callback function must return :attr:`END` or ``-1``. To
|
|
handle the conversation timeout, use handler :attr:`TIMEOUT` or ``-2``.
|
|
Finally, :class:`telegram.ext.ApplicationHandlerStop` can be used in conversations as described
|
|
in its documentation.
|
|
|
|
Note:
|
|
In each of the described collections of handlers, a handler may in turn be a
|
|
:class:`ConversationHandler`. In that case, the child :class:`ConversationHandler` should
|
|
have the attribute :attr:`map_to_parent` which allows returning to the parent conversation
|
|
at specified states within the child conversation.
|
|
|
|
Note that the keys in :attr:`map_to_parent` must not appear as keys in :attr:`states`
|
|
attribute or else the latter will be ignored. You may map :attr:`END` to one of the parents
|
|
states to continue the parent conversation after the child conversation has ended or even
|
|
map a state to :attr:`END` to end the *parent* conversation from within the child
|
|
conversation. For an example on nested :class:`ConversationHandler` s, see
|
|
:any:`examples.nestedconversationbot`.
|
|
|
|
Examples:
|
|
* :any:`Conversation Bot <examples.conversationbot>`
|
|
* :any:`Conversation Bot 2 <examples.conversationbot2>`
|
|
* :any:`Nested Conversation Bot <examples.nestedconversationbot>`
|
|
* :any:`Persistent Conversation Bot <examples.persistentconversationbot>`
|
|
|
|
Args:
|
|
entry_points (List[:class:`telegram.ext.BaseHandler`]): A list of :obj:`BaseHandler`
|
|
objects that
|
|
can trigger the start of the conversation. The first handler whose :meth:`check_update`
|
|
method returns :obj:`True` will be used. If all return :obj:`False`, the update is not
|
|
handled.
|
|
states (Dict[:obj:`object`, List[:class:`telegram.ext.BaseHandler`]]): A :obj:`dict` that
|
|
defines the different states of conversation a user can be in and one or more
|
|
associated :obj:`BaseHandler` objects that should be used in that state. The first
|
|
handler whose :meth:`check_update` method returns :obj:`True` will be used.
|
|
fallbacks (List[:class:`telegram.ext.BaseHandler`]): A list of handlers that might be used
|
|
if the user is in a conversation, but every handler for their current state returned
|
|
:obj:`False` on :meth:`check_update`. The first handler which :meth:`check_update`
|
|
method returns :obj:`True` will be used. If all return :obj:`False`, the update is not
|
|
handled.
|
|
allow_reentry (:obj:`bool`, optional): If set to :obj:`True`, a user that is currently in a
|
|
conversation can restart the conversation by triggering one of the entry points.
|
|
Default is :obj:`False`.
|
|
per_chat (:obj:`bool`, optional): If the conversation key should contain the Chat's ID.
|
|
Default is :obj:`True`.
|
|
per_user (:obj:`bool`, optional): If the conversation key should contain the User's ID.
|
|
Default is :obj:`True`.
|
|
per_message (:obj:`bool`, optional): If the conversation key should contain the Message's
|
|
ID. Default is :obj:`False`.
|
|
conversation_timeout (:obj:`float` | :obj:`datetime.timedelta`, optional): When this
|
|
handler is inactive more than this timeout (in seconds), it will be automatically
|
|
ended. If this value is ``0`` or :obj:`None` (default), there will be no timeout. The
|
|
last received update and the corresponding :class:`context <.CallbackContext>` will be
|
|
handled by *ALL* the handler's whose :meth:`check_update` method returns :obj:`True`
|
|
that are in the state :attr:`ConversationHandler.TIMEOUT`.
|
|
|
|
Caution:
|
|
* This feature relies on the :attr:`telegram.ext.Application.job_queue` being set
|
|
and hence requires that the dependencies that :class:`telegram.ext.JobQueue`
|
|
relies on are installed.
|
|
* Using :paramref:`conversation_timeout` with nested conversations is currently
|
|
not supported. You can still try to use it, but it will likely behave
|
|
differently from what you expect.
|
|
|
|
name (:obj:`str`, optional): The name for this conversation handler. Required for
|
|
persistence.
|
|
persistent (:obj:`bool`, optional): If the conversation's dict for this handler should be
|
|
saved. :paramref:`name` is required and persistence has to be set in
|
|
:attr:`Application <.Application.persistence>`.
|
|
|
|
.. versionchanged:: 20.0
|
|
Was previously named as ``persistence``.
|
|
map_to_parent (Dict[:obj:`object`, :obj:`object`], optional): A :obj:`dict` that can be
|
|
used to instruct a child conversation handler to transition into a mapped state on
|
|
its parent conversation handler in place of a specified nested state.
|
|
block (:obj:`bool`, optional): Pass :obj:`False` or :obj:`True` to set a default value for
|
|
the :attr:`BaseHandler.block` setting of all handlers (in :attr:`entry_points`,
|
|
:attr:`states` and :attr:`fallbacks`). The resolution order for checking if a handler
|
|
should be run non-blocking is:
|
|
|
|
1. :attr:`telegram.ext.BaseHandler.block` (if set)
|
|
2. the value passed to this parameter (if any)
|
|
3. :attr:`telegram.ext.Defaults.block` (if defaults are used)
|
|
|
|
.. seealso:: :wiki:`Concurrency`
|
|
|
|
.. versionchanged:: 20.0
|
|
No longer overrides the handlers settings. Resolution order was changed.
|
|
|
|
Raises:
|
|
:exc:`ValueError`: If :paramref:`persistent` is used but :paramref:`name` was not set, or
|
|
when :attr:`per_message`, :attr:`per_chat`, :attr:`per_user` are all :obj:`False`.
|
|
|
|
Attributes:
|
|
block (:obj:`bool`): Determines whether the callback will run in a blocking way. Always
|
|
:obj:`True` since conversation handlers handle any non-blocking callbacks internally.
|
|
|
|
"""
|
|
|
|
__slots__ = (
|
|
"_allow_reentry",
|
|
"_block",
|
|
"_child_conversations",
|
|
"_conversation_timeout",
|
|
"_conversations",
|
|
"_entry_points",
|
|
"_fallbacks",
|
|
"_map_to_parent",
|
|
"_name",
|
|
"_per_chat",
|
|
"_per_message",
|
|
"_per_user",
|
|
"_persistent",
|
|
"_states",
|
|
"_timeout_jobs_lock",
|
|
"timeout_jobs",
|
|
)
|
|
|
|
END: Final[int] = -1
|
|
""":obj:`int`: Used as a constant to return when a conversation is ended."""
|
|
TIMEOUT: Final[int] = -2
|
|
""":obj:`int`: Used as a constant to handle state when a conversation is timed out
|
|
(exceeded :attr:`conversation_timeout`).
|
|
"""
|
|
WAITING: Final[int] = -3
|
|
""":obj:`int`: Used as a constant to handle state when a conversation is still waiting on the
|
|
previous :attr:`block=False <block>` handler to finish."""
|
|
|
|
# pylint: disable=super-init-not-called
|
|
def __init__(
|
|
self,
|
|
entry_points: List[BaseHandler[Update, CCT]],
|
|
states: Dict[object, List[BaseHandler[Update, CCT]]],
|
|
fallbacks: List[BaseHandler[Update, CCT]],
|
|
allow_reentry: bool = False,
|
|
per_chat: bool = True,
|
|
per_user: bool = True,
|
|
per_message: bool = False,
|
|
conversation_timeout: Optional[Union[float, datetime.timedelta]] = None,
|
|
name: Optional[str] = None,
|
|
persistent: bool = False,
|
|
map_to_parent: Optional[Dict[object, object]] = None,
|
|
block: DVType[bool] = DEFAULT_TRUE,
|
|
):
|
|
# these imports need to be here because of circular import error otherwise
|
|
from telegram.ext import ( # pylint: disable=import-outside-toplevel
|
|
PollAnswerHandler,
|
|
PollHandler,
|
|
PreCheckoutQueryHandler,
|
|
ShippingQueryHandler,
|
|
)
|
|
|
|
# self.block is what the Application checks and we want it to always run CH in a blocking
|
|
# way so that CH can take care of any non-blocking logic internally
|
|
self.block: DVType[bool] = True
|
|
# Store the actual setting in a protected variable instead
|
|
self._block: DVType[bool] = block
|
|
|
|
self._entry_points: List[BaseHandler[Update, CCT]] = entry_points
|
|
self._states: Dict[object, List[BaseHandler[Update, CCT]]] = states
|
|
self._fallbacks: List[BaseHandler[Update, CCT]] = fallbacks
|
|
|
|
self._allow_reentry: bool = allow_reentry
|
|
self._per_user: bool = per_user
|
|
self._per_chat: bool = per_chat
|
|
self._per_message: bool = per_message
|
|
self._conversation_timeout: Optional[Union[float, datetime.timedelta]] = (
|
|
conversation_timeout
|
|
)
|
|
self._name: Optional[str] = name
|
|
self._map_to_parent: Optional[Dict[object, object]] = map_to_parent
|
|
|
|
# if conversation_timeout is used, this dict is used to schedule a job which runs when the
|
|
# conv has timed out.
|
|
self.timeout_jobs: Dict[ConversationKey, Job[Any]] = {}
|
|
self._timeout_jobs_lock = asyncio.Lock()
|
|
self._conversations: ConversationDict = {}
|
|
self._child_conversations: Set[ConversationHandler] = set()
|
|
|
|
if persistent and not self.name:
|
|
raise ValueError("Conversations can't be persistent when handler is unnamed.")
|
|
self._persistent: bool = persistent
|
|
|
|
if not any((self.per_user, self.per_chat, self.per_message)):
|
|
raise ValueError("'per_user', 'per_chat' and 'per_message' can't all be 'False'")
|
|
|
|
if self.per_message and not self.per_chat:
|
|
warn(
|
|
"If 'per_message=True' is used, 'per_chat=True' should also be used, "
|
|
"since message IDs are not globally unique.",
|
|
stacklevel=2,
|
|
)
|
|
|
|
all_handlers: List[BaseHandler[Update, CCT]] = []
|
|
all_handlers.extend(entry_points)
|
|
all_handlers.extend(fallbacks)
|
|
|
|
for state_handlers in states.values():
|
|
all_handlers.extend(state_handlers)
|
|
|
|
self._child_conversations.update(
|
|
handler for handler in all_handlers if isinstance(handler, ConversationHandler)
|
|
)
|
|
|
|
# this link will be added to all warnings tied to per_* setting
|
|
per_faq_link = (
|
|
" Read this FAQ entry to learn more about the per_* settings: "
|
|
"https://github.com/python-telegram-bot/python-telegram-bot/wiki"
|
|
"/Frequently-Asked-Questions#what-do-the-per_-settings-in-conversationhandler-do."
|
|
)
|
|
|
|
# this loop is going to warn the user about handlers which can work unexpectedly
|
|
# in conversations
|
|
for handler in all_handlers:
|
|
if isinstance(handler, (StringCommandHandler, StringRegexHandler)):
|
|
warn(
|
|
"The `ConversationHandler` only handles updates of type `telegram.Update`. "
|
|
f"{handler.__class__.__name__} handles updates of type `str`.",
|
|
stacklevel=2,
|
|
)
|
|
elif isinstance(handler, TypeHandler) and not issubclass(handler.type, Update):
|
|
warn(
|
|
"The `ConversationHandler` only handles updates of type `telegram.Update`."
|
|
f" The TypeHandler is set to handle {handler.type.__name__}.",
|
|
stacklevel=2,
|
|
)
|
|
elif isinstance(handler, PollHandler):
|
|
warn(
|
|
"PollHandler will never trigger in a conversation since it has no information "
|
|
"about the chat or the user who voted in it. Do you mean the "
|
|
"`PollAnswerHandler`?",
|
|
stacklevel=2,
|
|
)
|
|
|
|
elif self.per_chat and (
|
|
isinstance(
|
|
handler,
|
|
(
|
|
ShippingQueryHandler,
|
|
InlineQueryHandler,
|
|
ChosenInlineResultHandler,
|
|
PreCheckoutQueryHandler,
|
|
PollAnswerHandler,
|
|
),
|
|
)
|
|
):
|
|
warn(
|
|
f"Updates handled by {handler.__class__.__name__} only have information about "
|
|
"the user, so this handler won't ever be triggered if `per_chat=True`."
|
|
f"{per_faq_link}",
|
|
stacklevel=2,
|
|
)
|
|
|
|
elif self.per_message and not isinstance(handler, CallbackQueryHandler):
|
|
warn(
|
|
"If 'per_message=True', all entry points, state handlers, and fallbacks"
|
|
" must be 'CallbackQueryHandler', since no other handlers "
|
|
f"have a message context.{per_faq_link}",
|
|
stacklevel=2,
|
|
)
|
|
elif not self.per_message and isinstance(handler, CallbackQueryHandler):
|
|
warn(
|
|
"If 'per_message=False', 'CallbackQueryHandler' will not be "
|
|
f"tracked for every message.{per_faq_link}",
|
|
stacklevel=2,
|
|
)
|
|
|
|
if self.conversation_timeout and isinstance(handler, self.__class__):
|
|
warn(
|
|
"Using `conversation_timeout` with nested conversations is currently not "
|
|
"supported. You can still try to use it, but it will likely behave "
|
|
"differently from what you expect.",
|
|
stacklevel=2,
|
|
)
|
|
|
|
def __repr__(self) -> str:
|
|
"""Give a string representation of the ConversationHandler in the form
|
|
``ConversationHandler[name=..., states={...}]``.
|
|
|
|
If there are more than 3 states, only the first 3 states are listed.
|
|
|
|
As this class doesn't implement :meth:`object.__str__`, the default implementation
|
|
will be used, which is equivalent to :meth:`__repr__`.
|
|
|
|
Returns:
|
|
:obj:`str`
|
|
"""
|
|
truncation_threshold = 3
|
|
states = dict(list(self.states.items())[:truncation_threshold])
|
|
states_string = str(states)
|
|
if len(self.states) > truncation_threshold:
|
|
states_string = states_string[:-1] + ", ...}"
|
|
|
|
return build_repr_with_selected_attrs(
|
|
self,
|
|
name=self.name,
|
|
states=states_string,
|
|
)
|
|
|
|
@property
|
|
def entry_points(self) -> List[BaseHandler[Update, CCT]]:
|
|
"""List[:class:`telegram.ext.BaseHandler`]: A list of :obj:`BaseHandler` objects that can
|
|
trigger the start of the conversation.
|
|
"""
|
|
return self._entry_points
|
|
|
|
@entry_points.setter
|
|
def entry_points(self, _: object) -> NoReturn:
|
|
raise AttributeError(
|
|
"You can not assign a new value to entry_points after initialization."
|
|
)
|
|
|
|
@property
|
|
def states(self) -> Dict[object, List[BaseHandler[Update, CCT]]]:
|
|
"""Dict[:obj:`object`, List[:class:`telegram.ext.BaseHandler`]]: A :obj:`dict` that
|
|
defines the different states of conversation a user can be in and one or more
|
|
associated :obj:`BaseHandler` objects that should be used in that state.
|
|
"""
|
|
return self._states
|
|
|
|
@states.setter
|
|
def states(self, _: object) -> NoReturn:
|
|
raise AttributeError("You can not assign a new value to states after initialization.")
|
|
|
|
@property
|
|
def fallbacks(self) -> List[BaseHandler[Update, CCT]]:
|
|
"""List[:class:`telegram.ext.BaseHandler`]: A list of handlers that might be used if
|
|
the user is in a conversation, but every handler for their current state returned
|
|
:obj:`False` on :meth:`check_update`.
|
|
"""
|
|
return self._fallbacks
|
|
|
|
@fallbacks.setter
|
|
def fallbacks(self, _: object) -> NoReturn:
|
|
raise AttributeError("You can not assign a new value to fallbacks after initialization.")
|
|
|
|
@property
|
|
def allow_reentry(self) -> bool:
|
|
""":obj:`bool`: Determines if a user can restart a conversation with an entry point."""
|
|
return self._allow_reentry
|
|
|
|
@allow_reentry.setter
|
|
def allow_reentry(self, _: object) -> NoReturn:
|
|
raise AttributeError(
|
|
"You can not assign a new value to allow_reentry after initialization."
|
|
)
|
|
|
|
@property
|
|
def per_user(self) -> bool:
|
|
""":obj:`bool`: If the conversation key should contain the User's ID."""
|
|
return self._per_user
|
|
|
|
@per_user.setter
|
|
def per_user(self, _: object) -> NoReturn:
|
|
raise AttributeError("You can not assign a new value to per_user after initialization.")
|
|
|
|
@property
|
|
def per_chat(self) -> bool:
|
|
""":obj:`bool`: If the conversation key should contain the Chat's ID."""
|
|
return self._per_chat
|
|
|
|
@per_chat.setter
|
|
def per_chat(self, _: object) -> NoReturn:
|
|
raise AttributeError("You can not assign a new value to per_chat after initialization.")
|
|
|
|
@property
|
|
def per_message(self) -> bool:
|
|
""":obj:`bool`: If the conversation key should contain the message's ID."""
|
|
return self._per_message
|
|
|
|
@per_message.setter
|
|
def per_message(self, _: object) -> NoReturn:
|
|
raise AttributeError("You can not assign a new value to per_message after initialization.")
|
|
|
|
@property
|
|
def conversation_timeout(
|
|
self,
|
|
) -> Optional[Union[float, datetime.timedelta]]:
|
|
""":obj:`float` | :obj:`datetime.timedelta`: Optional. When this
|
|
handler is inactive more than this timeout (in seconds), it will be automatically
|
|
ended.
|
|
"""
|
|
return self._conversation_timeout
|
|
|
|
@conversation_timeout.setter
|
|
def conversation_timeout(self, _: object) -> NoReturn:
|
|
raise AttributeError(
|
|
"You can not assign a new value to conversation_timeout after initialization."
|
|
)
|
|
|
|
@property
|
|
def name(self) -> Optional[str]:
|
|
""":obj:`str`: Optional. The name for this :class:`ConversationHandler`."""
|
|
return self._name
|
|
|
|
@name.setter
|
|
def name(self, _: object) -> NoReturn:
|
|
raise AttributeError("You can not assign a new value to name after initialization.")
|
|
|
|
@property
|
|
def persistent(self) -> bool:
|
|
""":obj:`bool`: Optional. If the conversations dict for this handler should be
|
|
saved. :attr:`name` is required and persistence has to be set in
|
|
:attr:`Application <.Application.persistence>`.
|
|
"""
|
|
return self._persistent
|
|
|
|
@persistent.setter
|
|
def persistent(self, _: object) -> NoReturn:
|
|
raise AttributeError("You can not assign a new value to persistent after initialization.")
|
|
|
|
@property
|
|
def map_to_parent(self) -> Optional[Dict[object, object]]:
|
|
"""Dict[:obj:`object`, :obj:`object`]: Optional. A :obj:`dict` that can be
|
|
used to instruct a nested :class:`ConversationHandler` to transition into a mapped state on
|
|
its parent :class:`ConversationHandler` in place of a specified nested state.
|
|
"""
|
|
return self._map_to_parent
|
|
|
|
@map_to_parent.setter
|
|
def map_to_parent(self, _: object) -> NoReturn:
|
|
raise AttributeError(
|
|
"You can not assign a new value to map_to_parent after initialization."
|
|
)
|
|
|
|
async def _initialize_persistence(
|
|
self, application: "Application"
|
|
) -> Dict[str, TrackingDict[ConversationKey, object]]:
|
|
"""Initializes the persistence for this handler and its child conversations.
|
|
While this method is marked as protected, we expect it to be called by the
|
|
Application/parent conversations. It's just protected to hide it from users.
|
|
|
|
Args:
|
|
application (:class:`telegram.ext.Application`): The application.
|
|
|
|
Returns:
|
|
A dict {conversation.name -> TrackingDict}, which contains all dict of this
|
|
conversation and possible child conversations.
|
|
|
|
"""
|
|
if not (self.persistent and self.name and application.persistence):
|
|
raise RuntimeError(
|
|
"This handler is not persistent, has no name or the application has no "
|
|
"persistence!"
|
|
)
|
|
|
|
current_conversations = self._conversations
|
|
self._conversations = cast(
|
|
TrackingDict[ConversationKey, object],
|
|
TrackingDict(),
|
|
)
|
|
# In the conversation already processed updates
|
|
self._conversations.update(current_conversations)
|
|
# above might be partly overridden but that's okay since we warn about that in
|
|
# add_handler
|
|
stored_data = await application.persistence.get_conversations(self.name)
|
|
self._conversations.update_no_track(stored_data)
|
|
|
|
# Since CH.END is stored as normal state, we need to properly parse it here in order to
|
|
# actually end the conversation, i.e. delete the key from the _conversations dict
|
|
# This also makes sure that these entries are deleted from the persisted data on the next
|
|
# run of Application.update_persistence
|
|
for key, state in stored_data.items():
|
|
if state == self.END:
|
|
self._update_state(new_state=self.END, key=key)
|
|
|
|
out = {self.name: self._conversations}
|
|
|
|
for handler in self._child_conversations:
|
|
out.update(
|
|
await handler._initialize_persistence( # pylint: disable=protected-access
|
|
application=application
|
|
)
|
|
)
|
|
|
|
return out
|
|
|
|
def _get_key(self, update: Update) -> ConversationKey:
|
|
"""Builds the conversation key associated with the update."""
|
|
chat = update.effective_chat
|
|
user = update.effective_user
|
|
|
|
key: List[Union[int, str]] = []
|
|
|
|
if self.per_chat:
|
|
if chat is None:
|
|
raise RuntimeError("Can't build key for update without effective chat!")
|
|
key.append(chat.id)
|
|
|
|
if self.per_user:
|
|
if user is None:
|
|
raise RuntimeError("Can't build key for update without effective user!")
|
|
key.append(user.id)
|
|
|
|
if self.per_message:
|
|
if update.callback_query is None:
|
|
raise RuntimeError("Can't build key for update without CallbackQuery!")
|
|
if update.callback_query.inline_message_id:
|
|
key.append(update.callback_query.inline_message_id)
|
|
else:
|
|
key.append(update.callback_query.message.message_id) # type: ignore[union-attr]
|
|
|
|
return tuple(key)
|
|
|
|
async def _schedule_job_delayed(
|
|
self,
|
|
new_state: asyncio.Task,
|
|
application: "Application[Any, CCT, Any, Any, Any, JobQueue]",
|
|
update: Update,
|
|
context: CCT,
|
|
conversation_key: ConversationKey,
|
|
) -> None:
|
|
try:
|
|
effective_new_state = await new_state
|
|
except Exception as exc:
|
|
_LOGGER.debug(
|
|
"Non-blocking handler callback raised exception. Not scheduling conversation "
|
|
"timeout.",
|
|
exc_info=exc,
|
|
)
|
|
return None
|
|
return self._schedule_job(
|
|
new_state=effective_new_state,
|
|
application=application,
|
|
update=update,
|
|
context=context,
|
|
conversation_key=conversation_key,
|
|
)
|
|
|
|
def _schedule_job(
|
|
self,
|
|
new_state: object,
|
|
application: "Application[Any, CCT, Any, Any, Any, JobQueue]",
|
|
update: Update,
|
|
context: CCT,
|
|
conversation_key: ConversationKey,
|
|
) -> None:
|
|
"""Schedules a job which executes :meth:`_trigger_timeout` upon conversation timeout."""
|
|
if new_state == self.END:
|
|
return
|
|
|
|
try:
|
|
# both job_queue & conversation_timeout are checked before calling _schedule_job
|
|
j_queue = application.job_queue
|
|
self.timeout_jobs[conversation_key] = j_queue.run_once( # type: ignore[union-attr]
|
|
self._trigger_timeout,
|
|
self.conversation_timeout, # type: ignore[arg-type]
|
|
data=_ConversationTimeoutContext(conversation_key, update, application, context),
|
|
)
|
|
except Exception as exc:
|
|
_LOGGER.exception("Failed to schedule timeout.", exc_info=exc)
|
|
|
|
# pylint: disable=too-many-return-statements
|
|
def check_update(self, update: object) -> Optional[_CheckUpdateType[CCT]]:
|
|
"""
|
|
Determines whether an update should be handled by this conversation handler, and if so in
|
|
which state the conversation currently is.
|
|
|
|
Args:
|
|
update (:class:`telegram.Update` | :obj:`object`): Incoming update.
|
|
|
|
Returns:
|
|
:obj:`bool`
|
|
|
|
"""
|
|
if not isinstance(update, Update):
|
|
return None
|
|
# Ignore messages in channels
|
|
if update.channel_post or update.edited_channel_post:
|
|
return None
|
|
if self.per_chat and not update.effective_chat:
|
|
return None
|
|
if self.per_user and not update.effective_user:
|
|
return None
|
|
if self.per_message and not update.callback_query:
|
|
return None
|
|
if update.callback_query and self.per_chat and not update.callback_query.message:
|
|
return None
|
|
|
|
key = self._get_key(update)
|
|
state = self._conversations.get(key)
|
|
check: Optional[object] = None
|
|
|
|
# Resolve futures
|
|
if isinstance(state, PendingState):
|
|
_LOGGER.debug("Waiting for asyncio Task to finish ...")
|
|
|
|
# check if future is finished or not
|
|
if state.done():
|
|
res = state.resolve()
|
|
# Special case if an error was raised in a non-blocking entry-point
|
|
if state.old_state is None and state.task.exception():
|
|
self._conversations.pop(key, None)
|
|
state = None
|
|
else:
|
|
self._update_state(res, key)
|
|
state = self._conversations.get(key)
|
|
|
|
# if not then handle WAITING state instead
|
|
else:
|
|
handlers = self.states.get(self.WAITING, [])
|
|
for handler_ in handlers:
|
|
check = handler_.check_update(update)
|
|
if check is not None and check is not False:
|
|
return self.WAITING, key, handler_, check
|
|
return None
|
|
|
|
_LOGGER.debug("Selecting conversation %s with state %s", str(key), str(state))
|
|
|
|
handler: Optional[BaseHandler] = None
|
|
|
|
# Search entry points for a match
|
|
if state is None or self.allow_reentry:
|
|
for entry_point in self.entry_points:
|
|
check = entry_point.check_update(update)
|
|
if check is not None and check is not False:
|
|
handler = entry_point
|
|
break
|
|
|
|
else:
|
|
if state is None:
|
|
return None
|
|
|
|
# Get the handler list for current state, if we didn't find one yet and we're still here
|
|
if state is not None and handler is None:
|
|
for candidate in self.states.get(state, []):
|
|
check = candidate.check_update(update)
|
|
if check is not None and check is not False:
|
|
handler = candidate
|
|
break
|
|
|
|
# Find a fallback handler if all other handlers fail
|
|
else:
|
|
for fallback in self.fallbacks:
|
|
check = fallback.check_update(update)
|
|
if check is not None and check is not False:
|
|
handler = fallback
|
|
break
|
|
|
|
else:
|
|
return None
|
|
|
|
return state, key, handler, check # type: ignore[return-value]
|
|
|
|
async def handle_update( # type: ignore[override]
|
|
self,
|
|
update: Update,
|
|
application: "Application[Any, CCT, Any, Any, Any, Any]",
|
|
check_result: _CheckUpdateType[CCT],
|
|
context: CCT,
|
|
) -> Optional[object]:
|
|
"""Send the update to the callback for the current state and BaseHandler
|
|
|
|
Args:
|
|
check_result: The result from :meth:`check_update`. For this handler it's a tuple of
|
|
the conversation state, key, handler, and the handler's check result.
|
|
update (:class:`telegram.Update`): Incoming telegram update.
|
|
application (:class:`telegram.ext.Application`): Application that originated the
|
|
update.
|
|
context (:class:`telegram.ext.CallbackContext`): The context as provided by
|
|
the application.
|
|
|
|
"""
|
|
current_state, conversation_key, handler, handler_check_result = check_result
|
|
raise_dp_handler_stop = False
|
|
|
|
async with self._timeout_jobs_lock:
|
|
# Remove the old timeout job (if present)
|
|
timeout_job = self.timeout_jobs.pop(conversation_key, None)
|
|
|
|
if timeout_job is not None:
|
|
timeout_job.schedule_removal()
|
|
|
|
# Resolution order of "block":
|
|
# 1. Setting of the selected handler
|
|
# 2. Setting of the ConversationHandler
|
|
# 3. Default values of the bot
|
|
if handler.block is not DEFAULT_TRUE:
|
|
block = handler.block
|
|
elif self._block is not DEFAULT_TRUE:
|
|
block = self._block
|
|
elif isinstance(application.bot, ExtBot) and application.bot.defaults is not None:
|
|
block = application.bot.defaults.block
|
|
else:
|
|
block = DefaultValue.get_value(handler.block)
|
|
|
|
try: # Now create task or await the callback
|
|
if block:
|
|
new_state: object = await handler.handle_update(
|
|
update, application, handler_check_result, context
|
|
)
|
|
else:
|
|
new_state = application.create_task(
|
|
coroutine=handler.handle_update(
|
|
update, application, handler_check_result, context
|
|
),
|
|
update=update,
|
|
name=f"ConversationHandler:{update.update_id}:handle_update:non_blocking_cb",
|
|
)
|
|
except ApplicationHandlerStop as exception:
|
|
new_state = exception.state
|
|
raise_dp_handler_stop = True
|
|
async with self._timeout_jobs_lock:
|
|
if self.conversation_timeout:
|
|
if application.job_queue is None:
|
|
warn(
|
|
"Ignoring `conversation_timeout` because the Application has no JobQueue.",
|
|
stacklevel=1,
|
|
)
|
|
elif not application.job_queue.scheduler.running:
|
|
warn(
|
|
"Ignoring `conversation_timeout` because the Applications JobQueue is "
|
|
"not running.",
|
|
stacklevel=1,
|
|
)
|
|
elif isinstance(new_state, asyncio.Task):
|
|
# Add the new timeout job
|
|
# checking if the new state is self.END is done in _schedule_job
|
|
application.create_task(
|
|
self._schedule_job_delayed(
|
|
new_state, application, update, context, conversation_key
|
|
),
|
|
update=update,
|
|
name=f"ConversationHandler:{update.update_id}:handle_update:timeout_job",
|
|
)
|
|
else:
|
|
self._schedule_job(new_state, application, update, context, conversation_key)
|
|
|
|
if isinstance(self.map_to_parent, dict) and new_state in self.map_to_parent:
|
|
self._update_state(self.END, conversation_key, handler)
|
|
if raise_dp_handler_stop:
|
|
raise ApplicationHandlerStop(self.map_to_parent.get(new_state))
|
|
return self.map_to_parent.get(new_state)
|
|
|
|
if current_state != self.WAITING:
|
|
self._update_state(new_state, conversation_key, handler)
|
|
|
|
if raise_dp_handler_stop:
|
|
# Don't pass the new state here. If we're in a nested conversation, the parent is
|
|
# expecting None as return value.
|
|
raise ApplicationHandlerStop
|
|
# Signals a possible parent conversation to stay in the current state
|
|
return None
|
|
|
|
def _update_state(
|
|
self, new_state: object, key: ConversationKey, handler: Optional[BaseHandler] = None
|
|
) -> None:
|
|
if new_state == self.END:
|
|
if key in self._conversations:
|
|
# If there is no key in conversations, nothing is done.
|
|
del self._conversations[key]
|
|
|
|
elif isinstance(new_state, asyncio.Task):
|
|
self._conversations[key] = PendingState(
|
|
old_state=self._conversations.get(key), task=new_state
|
|
)
|
|
|
|
elif new_state is not None:
|
|
if new_state not in self.states:
|
|
warn(
|
|
f"{repr(handler.callback.__name__) if handler is not None else 'BaseHandler'} "
|
|
f"returned state {new_state} which is unknown to the "
|
|
f"ConversationHandler{' ' + self.name if self.name is not None else ''}.",
|
|
stacklevel=2,
|
|
)
|
|
self._conversations[key] = new_state
|
|
|
|
async def _trigger_timeout(self, context: CCT) -> None:
|
|
"""This is run whenever a conversation has timed out. Also makes sure that all handlers
|
|
which are in the :attr:`TIMEOUT` state and whose :meth:`BaseHandler.check_update` returns
|
|
:obj:`True` is handled.
|
|
"""
|
|
job = cast("Job", context.job)
|
|
ctxt = cast(_ConversationTimeoutContext, job.data)
|
|
|
|
_LOGGER.debug(
|
|
"Conversation timeout was triggered for conversation %s!", ctxt.conversation_key
|
|
)
|
|
|
|
callback_context = ctxt.callback_context
|
|
|
|
async with self._timeout_jobs_lock:
|
|
found_job = self.timeout_jobs.get(ctxt.conversation_key)
|
|
if found_job is not job:
|
|
# The timeout has been cancelled in handle_update
|
|
return
|
|
del self.timeout_jobs[ctxt.conversation_key]
|
|
|
|
# Now run all handlers which are in TIMEOUT state
|
|
handlers = self.states.get(self.TIMEOUT, [])
|
|
for handler in handlers:
|
|
check = handler.check_update(ctxt.update)
|
|
if check is not None and check is not False:
|
|
try:
|
|
await handler.handle_update(
|
|
ctxt.update, ctxt.application, check, callback_context
|
|
)
|
|
except ApplicationHandlerStop:
|
|
warn(
|
|
"ApplicationHandlerStop in TIMEOUT state of "
|
|
"ConversationHandler has no effect. Ignoring.",
|
|
stacklevel=2,
|
|
)
|
|
|
|
self._update_state(self.END, ctxt.conversation_key)
|