Convenience Utilities & Example for Handling ChatMemberUpdated (#2490)

* ChatMemberUpdate.difference

* constants for update types

* Add an example for ChatMemberHandler

* Update examples/chatmemberbot.py

* Review

* Review
This commit is contained in:
Bibo-Joshi 2021-04-30 10:14:41 +02:00 committed by GitHub
parent 4645d0e32a
commit 3938a57542
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 390 additions and 2 deletions

View file

@ -46,5 +46,8 @@ A basic example of a bot that can accept payments. Don't forget to enable and co
### [`errorhandlerbot.py`](https://github.com/python-telegram-bot/python-telegram-bot/blob/master/examples/errorhandlerbot.py)
A basic example on how to set up a custom error handler.
### [`chatmemberbot.py`](https://github.com/python-telegram-bot/python-telegram-bot/blob/master/examples/chatmemberbot.py)
A basic example on how `(my_)chat_member` updates can be used.
## Pure API
The [`rawapibot.py`](https://github.com/python-telegram-bot/python-telegram-bot/blob/master/examples/rawapibot.py) example uses only the pure, "bare-metal" API wrapper.

167
examples/chatmemberbot.py Normal file
View file

@ -0,0 +1,167 @@
#!/usr/bin/env python
# pylint: disable=C0116
# This program is dedicated to the public domain under the CC0 license.
"""
Simple Bot to handle '(my_)chat_member' updates.
Greets new users & keeps track of which chats the bot is in.
Usage:
Press Ctrl-C on the command line or send a signal to the process to stop the
bot.
"""
import logging
from typing import Tuple, Optional
from telegram import Update, Chat, ChatMember, ParseMode, ChatMemberUpdated
from telegram.ext import (
Updater,
CommandHandler,
CallbackContext,
ChatMemberHandler,
)
# Enable logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
def extract_status_change(
chat_member_update: ChatMemberUpdated,
) -> Optional[Tuple[bool, bool]]:
"""Takes a ChatMemberUpdated instance and extracts whether the 'old_chat_member' was a member
of the chat and whether the 'new_chat_member' is a member of the chat. Returns None, if
the status didn't change."""
status_change = chat_member_update.difference().get("status")
old_is_member, new_is_member = chat_member_update.difference().get("is_member", (None, None))
if status_change is None:
return None
old_status, new_status = status_change
was_member = (
old_status
in [
ChatMember.MEMBER,
ChatMember.CREATOR,
ChatMember.ADMINISTRATOR,
]
or (old_status == ChatMember.RESTRICTED and old_is_member is True)
)
is_member = (
new_status
in [
ChatMember.MEMBER,
ChatMember.CREATOR,
ChatMember.ADMINISTRATOR,
]
or (new_status == ChatMember.RESTRICTED and new_is_member is True)
)
return was_member, is_member
def track_chats(update: Update, context: CallbackContext) -> None:
"""Tracks the chats the bot is in."""
result = extract_status_change(update.my_chat_member)
if result is None:
return
was_member, is_member = result
# Let's check who is responsible for the change
cause_name = update.effective_user.full_name
# Handle chat types differently:
chat = update.effective_chat
if chat.type == Chat.PRIVATE:
if not was_member and is_member:
logger.info("%s started the bot", cause_name)
context.bot_data.setdefault("user_ids", set()).add(chat.id)
elif was_member and not is_member:
logger.info("%s blocked the bot", cause_name)
context.bot_data.setdefault("user_ids", set()).discard(chat.id)
elif chat.type in [Chat.GROUP, Chat.SUPERGROUP]:
if not was_member and is_member:
logger.info("%s added the bot to the group %s", cause_name, chat.title)
context.bot_data.setdefault("group_ids", set()).add(chat.id)
elif was_member and not is_member:
logger.info("%s removed the bot from the group %s", cause_name, chat.title)
context.bot_data.setdefault("group_ids", set()).discard(chat.id)
else:
if not was_member and is_member:
logger.info("%s added the bot to the channel %s", cause_name, chat.title)
context.bot_data.setdefault("channel_ids", set()).add(chat.id)
elif was_member and not is_member:
logger.info("%s removed the bot from the channel %s", cause_name, chat.title)
context.bot_data.setdefault("channel_ids", set()).discard(chat.id)
def show_chats(update: Update, context: CallbackContext) -> None:
"""Shows which chats the bot is in"""
user_ids = ", ".join(str(uid) for uid in context.bot_data.setdefault("user_ids", set()))
group_ids = ", ".join(str(gid) for gid in context.bot_data.setdefault("group_ids", set()))
channel_ids = ", ".join(str(cid) for cid in context.bot_data.setdefault("channel_ids", set()))
text = (
f"@{context.bot.username} is currently in a conversation with the user IDs {user_ids}."
f" Moreover it is a member of the groups with IDs {group_ids} "
f"and administrator in the channels with IDs {channel_ids}."
)
update.effective_message.reply_text(text)
def greet_chat_members(update: Update, _: CallbackContext) -> None:
"""Greets new users in chats and announces when someone leaves"""
result = extract_status_change(update.chat_member)
if result is None:
return
was_member, is_member = result
cause_name = update.chat_member.from_user.mention_html()
member_name = update.chat_member.new_chat_member.user.mention_html()
if not was_member and is_member:
update.effective_chat.send_message(
f"{member_name} was added by {cause_name}. Welcome!",
parse_mode=ParseMode.HTML,
)
elif was_member and not is_member:
update.effective_chat.send_message(
f"{member_name} is no longer with us. Thanks a lot, {cause_name} ...",
parse_mode=ParseMode.HTML,
)
def main() -> None:
"""Start the bot."""
# Create the Updater and pass it your bot's token.
updater = Updater("TOKEN")
# Get the dispatcher to register handlers
dispatcher = updater.dispatcher
# Keep track of which chats the bot is in
dispatcher.add_handler(ChatMemberHandler(track_chats, ChatMemberHandler.MY_CHAT_MEMBER))
dispatcher.add_handler(CommandHandler("show_chats", show_chats))
# Handle members joining/leaving chats.
dispatcher.add_handler(ChatMemberHandler(greet_chat_members, ChatMemberHandler.CHAT_MEMBER))
# Start the Bot
# We pass 'allowed_updates' to *only* handle updates with '(my_)chat_member' or 'message'
# If you want to handle *all* updates, pass Update.ALL_TYPES
updater.start_polling(
allowed_updates=[Update.MESSAGE, Update.CHAT_MEMBER, Update.MY_CHAT_MEMBER]
)
# Run the bot until you press Ctrl-C or the process receives SIGINT,
# SIGTERM or SIGABRT. This should be used most of the time, since
# start_polling() is non-blocking and will stop the bot gracefully.
updater.idle()
if __name__ == "__main__":
main()

View file

@ -18,7 +18,7 @@
# along with this program. If not, see [http://www.gnu.org/licenses/].
"""This module contains an object that represents a Telegram ChatMemberUpdated."""
import datetime
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Any, Optional, Dict, Tuple, Union
from telegram import TelegramObject, User, Chat, ChatMember, ChatInviteLink
from telegram.utils.helpers import from_timestamp, to_timestamp
@ -113,3 +113,49 @@ class ChatMemberUpdated(TelegramObject):
data['date'] = to_timestamp(self.date)
return data
def difference(
self,
) -> Dict[
str,
Tuple[
Union[str, bool, datetime.datetime, User], Union[str, bool, datetime.datetime, User]
],
]:
"""Computes the difference between :attr:`old_chat_member` and :attr:`new_chat_member`.
Example:
.. code:: python
>>> chat_member_updated.difference()
{'custom_title': ('old title', 'new title')}
Note:
To determine, if the :attr:`telegram.ChatMember.user` attribute has changed, *every*
attribute of the user will be checked.
.. versionadded:: 13.5
Returns:
Dict[:obj:`str`, Tuple[:obj:`obj`, :obj:`obj`]]: A dictionary mapping attribute names
to tuples of the form ``(old_value, new_value)``
"""
# we first get the names of the attributes that have changed
# user.to_dict() is unhashable, so that needs some special casing further down
old_dict = self.old_chat_member.to_dict()
old_user_dict = old_dict.pop('user')
new_dict = self.new_chat_member.to_dict()
new_user_dict = new_dict.pop('user')
# Generator for speed: we only need to iterate over it once
# we can't directly use the values from old_dict ^ new_dict b/c that set is unordered
attributes = (entry[0] for entry in set(old_dict.items()) ^ set(new_dict.items()))
result = {
attribute: (self.old_chat_member[attribute], self.new_chat_member[attribute])
for attribute in attributes
}
if old_user_dict != new_user_dict:
result['user'] = (self.old_chat_member.user, self.new_chat_member.user)
return result # type: ignore[return-value]

View file

@ -150,7 +150,7 @@ Attributes:
MAX_POLL_QUESTION_LENGTH (:obj:`int`): 300
MAX_POLL_OPTION_LENGTH (:obj:`int`): 100
:class:`telegram.files.MaskPosition`:
:class:`telegram.MaskPosition`:
Attributes:
STICKER_FOREHEAD (:obj:`str`): 'forehead'
@ -158,6 +158,52 @@ Attributes:
STICKER_MOUTH (:obj:`str`): 'mouth'
STICKER_CHIN (:obj:`str`): 'chin'
:class:`telegram.Update`:
Attributes:
UPDATE_MESSAGE (:obj:`str`): ``'message'``
.. versionadded:: 13.5
UPDATE_EDITED_MESSAGE (:obj:`str`): ``'edited_message'``
.. versionadded:: 13.5
UPDATE_CHANNEL_POST (:obj:`str`): ``'channel_post'``
.. versionadded:: 13.5
UPDATE_EDITED_CHANNEL_POST (:obj:`str`): ``'edited_channel_post'``
.. versionadded:: 13.5
UPDATE_INLINE_QUERY (:obj:`str`): ``'inline_query'``
.. versionadded:: 13.5
UPDATE_CHOSEN_INLINE_RESULT (:obj:`str`): ``'chosen_inline_result'``
.. versionadded:: 13.5
UPDATE_CALLBACK_QUERY (:obj:`str`): ``'callback_query'``
.. versionadded:: 13.5
UPDATE_SHIPPING_QUERY (:obj:`str`): ``'shipping_query'``
.. versionadded:: 13.5
UPDATE_PRE_CHECKOUT_QUERY (:obj:`str`): ``'pre_checkout_query'``
.. versionadded:: 13.5
UPDATE_POLL (:obj:`str`): ``'poll'``
.. versionadded:: 13.5
UPDATE_POLL_ANSWER (:obj:`str`): ``'poll_answer'``
.. versionadded:: 13.5
UPDATE_MY_CHAT_MEMBER (:obj:`str`): ``'my_chat_member'``
.. versionadded:: 13.5
UPDATE_CHAT_MEMBER (:obj:`str`): ``'chat_member'``
.. versionadded:: 13.5
UPDATE_ALL_TYPES (List[:obj:`str`]): List of all update types.
.. versionadded:: 13.5
"""
from typing import List
@ -267,3 +313,32 @@ STICKER_FOREHEAD: str = 'forehead'
STICKER_EYES: str = 'eyes'
STICKER_MOUTH: str = 'mouth'
STICKER_CHIN: str = 'chin'
UPDATE_MESSAGE = 'message'
UPDATE_EDITED_MESSAGE = 'edited_message'
UPDATE_CHANNEL_POST = 'channel_post'
UPDATE_EDITED_CHANNEL_POST = 'edited_channel_post'
UPDATE_INLINE_QUERY = 'inline_query'
UPDATE_CHOSEN_INLINE_RESULT = 'chosen_inline_result'
UPDATE_CALLBACK_QUERY = 'callback_query'
UPDATE_SHIPPING_QUERY = 'shipping_query'
UPDATE_PRE_CHECKOUT_QUERY = 'pre_checkout_query'
UPDATE_POLL = 'poll'
UPDATE_POLL_ANSWER = 'poll_answer'
UPDATE_MY_CHAT_MEMBER = 'my_chat_member'
UPDATE_CHAT_MEMBER = 'chat_member'
UPDATE_ALL_TYPES = [
UPDATE_MESSAGE,
UPDATE_EDITED_MESSAGE,
UPDATE_CHANNEL_POST,
UPDATE_EDITED_CHANNEL_POST,
UPDATE_INLINE_QUERY,
UPDATE_CHOSEN_INLINE_RESULT,
UPDATE_CALLBACK_QUERY,
UPDATE_SHIPPING_QUERY,
UPDATE_PRE_CHECKOUT_QUERY,
UPDATE_POLL,
UPDATE_POLL_ANSWER,
UPDATE_MY_CHAT_MEMBER,
UPDATE_CHAT_MEMBER,
]

View file

@ -30,6 +30,7 @@ from telegram import (
ShippingQuery,
TelegramObject,
ChatMemberUpdated,
constants,
)
from telegram.poll import PollAnswer
from telegram.utils.types import JSONDict
@ -124,6 +125,63 @@ class Update(TelegramObject):
"""
MESSAGE = constants.UPDATE_MESSAGE
""":const:`telegram.constants.UPDATE_MESSAGE`
.. versionadded:: 13.5"""
EDITED_MESSAGE = constants.UPDATE_EDITED_MESSAGE
""":const:`telegram.constants.UPDATE_EDITED_MESSAGE`
.. versionadded:: 13.5"""
CHANNEL_POST = constants.UPDATE_CHANNEL_POST
""":const:`telegram.constants.UPDATE_CHANNEL_POST`
.. versionadded:: 13.5"""
EDITED_CHANNEL_POST = constants.UPDATE_EDITED_CHANNEL_POST
""":const:`telegram.constants.UPDATE_EDITED_CHANNEL_POST`
.. versionadded:: 13.5"""
INLINE_QUERY = constants.UPDATE_INLINE_QUERY
""":const:`telegram.constants.UPDATE_INLINE_QUERY`
.. versionadded:: 13.5"""
CHOSEN_INLINE_RESULT = constants.UPDATE_CHOSEN_INLINE_RESULT
""":const:`telegram.constants.UPDATE_CHOSEN_INLINE_RESULT`
.. versionadded:: 13.5"""
CALLBACK_QUERY = constants.UPDATE_CALLBACK_QUERY
""":const:`telegram.constants.UPDATE_CALLBACK_QUERY`
.. versionadded:: 13.5"""
SHIPPING_QUERY = constants.UPDATE_SHIPPING_QUERY
""":const:`telegram.constants.UPDATE_SHIPPING_QUERY`
.. versionadded:: 13.5"""
PRE_CHECKOUT_QUERY = constants.UPDATE_PRE_CHECKOUT_QUERY
""":const:`telegram.constants.UPDATE_PRE_CHECKOUT_QUERY`
.. versionadded:: 13.5"""
POLL = constants.UPDATE_POLL
""":const:`telegram.constants.UPDATE_POLL`
.. versionadded:: 13.5"""
POLL_ANSWER = constants.UPDATE_POLL_ANSWER
""":const:`telegram.constants.UPDATE_POLL_ANSWER`
.. versionadded:: 13.5"""
MY_CHAT_MEMBER = constants.UPDATE_MY_CHAT_MEMBER
""":const:`telegram.constants.UPDATE_MY_CHAT_MEMBER`
.. versionadded:: 13.5"""
CHAT_MEMBER = constants.UPDATE_CHAT_MEMBER
""":const:`telegram.constants.UPDATE_CHAT_MEMBER`
.. versionadded:: 13.5"""
ALL_TYPES = constants.UPDATE_ALL_TYPES
""":const:`telegram.constants.UPDATE_ALL_TYPES`
.. versionadded:: 13.5"""
def __init__(
self,
update_id: int,

View file

@ -17,6 +17,7 @@
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
import datetime
import inspect
import pytest
import pytz
@ -175,3 +176,41 @@ class TestChatMemberUpdated:
for other in [c, d, e, f, g]:
assert a != other
assert hash(a) != hash(other)
def test_difference_required(self, user, chat):
old_chat_member = ChatMember(user, 'old_status')
new_chat_member = ChatMember(user, 'new_status')
chat_member_updated = ChatMemberUpdated(
chat, user, datetime.datetime.utcnow(), old_chat_member, new_chat_member
)
assert chat_member_updated.difference() == {'status': ('old_status', 'new_status')}
# We deliberately change an optional argument here to make sure that comparision doesn't
# just happens by id/required args
new_user = User(1, 'First name', False, last_name='last name')
new_chat_member.user = new_user
assert chat_member_updated.difference() == {
'status': ('old_status', 'new_status'),
'user': (user, new_user),
}
@pytest.mark.parametrize(
'optional_attribute',
# This gives the names of all optional arguments of ChatMember
[
name
for name, param in inspect.signature(ChatMember).parameters.items()
if name != 'self' and param.default != inspect.Parameter.empty
],
)
def test_difference_optionals(self, optional_attribute, user, chat):
# we use datetimes here, because we need that for `until_date` and it doesn't matter for
# the other attributes
old_value = datetime.datetime(2020, 1, 1)
new_value = datetime.datetime(2021, 1, 1)
old_chat_member = ChatMember(user, 'status', **{optional_attribute: old_value})
new_chat_member = ChatMember(user, 'status', **{optional_attribute: new_value})
chat_member_updated = ChatMemberUpdated(
chat, user, datetime.datetime.utcnow(), old_chat_member, new_chat_member
)
assert chat_member_updated.difference() == {optional_attribute: (old_value, new_value)}