python-telegram-bot/examples/chatmemberbot.py
Harshil 335772568f Update and Expand Tests & pre-commit Settings and Improve Code Quality (#2925)
Co-authored-by: Hinrich Mahler <22366557+Bibo-Joshi@users.noreply.github.com>
2022-05-06 18:22:34 +02:00

147 lines
5.7 KiB
Python

#!/usr/bin/env python
# pylint: disable=unused-argument
# This program is dedicated to the public domain under the CC0 license.
"""
Simple Bot to handle '(my_)chat_member' updates.
Greets new users & keeps track of which chats the bot is in.
Usage:
Press Ctrl-C on the command line or send a signal to the process to stop the
bot.
"""
import logging
from typing import Optional, Tuple
from telegram import Chat, ChatMember, ChatMemberUpdated, Update
from telegram.constants import ParseMode
from telegram.ext import Application, CallbackContext, ChatMemberHandler, CommandHandler
# Enable logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
def extract_status_change(
chat_member_update: ChatMemberUpdated,
) -> Optional[Tuple[bool, bool]]:
"""Takes a ChatMemberUpdated instance and extracts whether the 'old_chat_member' was a member
of the chat and whether the 'new_chat_member' is a member of the chat. Returns None, if
the status didn't change.
"""
status_change = chat_member_update.difference().get("status")
old_is_member, new_is_member = chat_member_update.difference().get("is_member", (None, None))
if status_change is None:
return None
old_status, new_status = status_change
was_member = old_status in [
ChatMember.MEMBER,
ChatMember.OWNER,
ChatMember.ADMINISTRATOR,
] or (old_status == ChatMember.RESTRICTED and old_is_member is True)
is_member = new_status in [
ChatMember.MEMBER,
ChatMember.OWNER,
ChatMember.ADMINISTRATOR,
] or (new_status == ChatMember.RESTRICTED and new_is_member is True)
return was_member, is_member
async def track_chats(update: Update, context: CallbackContext.DEFAULT_TYPE) -> None:
"""Tracks the chats the bot is in."""
result = extract_status_change(update.my_chat_member)
if result is None:
return
was_member, is_member = result
# Let's check who is responsible for the change
cause_name = update.effective_user.full_name
# Handle chat types differently:
chat = update.effective_chat
if chat.type == Chat.PRIVATE:
if not was_member and is_member:
logger.info("%s started the bot", cause_name)
context.bot_data.setdefault("user_ids", set()).add(chat.id)
elif was_member and not is_member:
logger.info("%s blocked the bot", cause_name)
context.bot_data.setdefault("user_ids", set()).discard(chat.id)
elif chat.type in [Chat.GROUP, Chat.SUPERGROUP]:
if not was_member and is_member:
logger.info("%s added the bot to the group %s", cause_name, chat.title)
context.bot_data.setdefault("group_ids", set()).add(chat.id)
elif was_member and not is_member:
logger.info("%s removed the bot from the group %s", cause_name, chat.title)
context.bot_data.setdefault("group_ids", set()).discard(chat.id)
else:
if not was_member and is_member:
logger.info("%s added the bot to the channel %s", cause_name, chat.title)
context.bot_data.setdefault("channel_ids", set()).add(chat.id)
elif was_member and not is_member:
logger.info("%s removed the bot from the channel %s", cause_name, chat.title)
context.bot_data.setdefault("channel_ids", set()).discard(chat.id)
async def show_chats(update: Update, context: CallbackContext.DEFAULT_TYPE) -> None:
"""Shows which chats the bot is in"""
user_ids = ", ".join(str(uid) for uid in context.bot_data.setdefault("user_ids", set()))
group_ids = ", ".join(str(gid) for gid in context.bot_data.setdefault("group_ids", set()))
channel_ids = ", ".join(str(cid) for cid in context.bot_data.setdefault("channel_ids", set()))
text = (
f"@{context.bot.username} is currently in a conversation with the user IDs {user_ids}."
f" Moreover it is a member of the groups with IDs {group_ids} "
f"and administrator in the channels with IDs {channel_ids}."
)
await update.effective_message.reply_text(text)
async def greet_chat_members(update: Update, context: CallbackContext.DEFAULT_TYPE) -> None:
"""Greets new users in chats and announces when someone leaves"""
result = extract_status_change(update.chat_member)
if result is None:
return
was_member, is_member = result
cause_name = update.chat_member.from_user.mention_html()
member_name = update.chat_member.new_chat_member.user.mention_html()
if not was_member and is_member:
await update.effective_chat.send_message(
f"{member_name} was added by {cause_name}. Welcome!",
parse_mode=ParseMode.HTML,
)
elif was_member and not is_member:
await update.effective_chat.send_message(
f"{member_name} is no longer with us. Thanks a lot, {cause_name} ...",
parse_mode=ParseMode.HTML,
)
def main() -> None:
"""Start the bot."""
# Create the Application and pass it your bot's token.
application = Application.builder().token("TOKEN").build()
# Keep track of which chats the bot is in
application.add_handler(ChatMemberHandler(track_chats, ChatMemberHandler.MY_CHAT_MEMBER))
application.add_handler(CommandHandler("show_chats", show_chats))
# Handle members joining/leaving chats.
application.add_handler(ChatMemberHandler(greet_chat_members, ChatMemberHandler.CHAT_MEMBER))
# Run the bot until the user presses Ctrl-C
# We pass 'allowed_updates' handle *all* updates including `chat_member` updates
# To reset this, simply pass `allowed_updates=[]`
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()