#!/usr/bin/env python # pylint: disable=unused-argument # This program is dedicated to the public domain under the CC0 license. """ Simple Bot to handle '(my_)chat_member' updates. Greets new users & keeps track of which chats the bot is in. Usage: Press Ctrl-C on the command line or send a signal to the process to stop the bot. """ import logging from typing import Optional, Tuple from telegram import Chat, ChatMember, ChatMemberUpdated, Update from telegram.constants import ParseMode from telegram.ext import Application, CallbackContext, ChatMemberHandler, CommandHandler # Enable logging logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO ) logger = logging.getLogger(__name__) def extract_status_change( chat_member_update: ChatMemberUpdated, ) -> Optional[Tuple[bool, bool]]: """Takes a ChatMemberUpdated instance and extracts whether the 'old_chat_member' was a member of the chat and whether the 'new_chat_member' is a member of the chat. Returns None, if the status didn't change. """ status_change = chat_member_update.difference().get("status") old_is_member, new_is_member = chat_member_update.difference().get("is_member", (None, None)) if status_change is None: return None old_status, new_status = status_change was_member = old_status in [ ChatMember.MEMBER, ChatMember.OWNER, ChatMember.ADMINISTRATOR, ] or (old_status == ChatMember.RESTRICTED and old_is_member is True) is_member = new_status in [ ChatMember.MEMBER, ChatMember.OWNER, ChatMember.ADMINISTRATOR, ] or (new_status == ChatMember.RESTRICTED and new_is_member is True) return was_member, is_member async def track_chats(update: Update, context: CallbackContext.DEFAULT_TYPE) -> None: """Tracks the chats the bot is in.""" result = extract_status_change(update.my_chat_member) if result is None: return was_member, is_member = result # Let's check who is responsible for the change cause_name = update.effective_user.full_name # Handle chat types differently: chat = update.effective_chat if chat.type == Chat.PRIVATE: if not was_member and is_member: logger.info("%s started the bot", cause_name) context.bot_data.setdefault("user_ids", set()).add(chat.id) elif was_member and not is_member: logger.info("%s blocked the bot", cause_name) context.bot_data.setdefault("user_ids", set()).discard(chat.id) elif chat.type in [Chat.GROUP, Chat.SUPERGROUP]: if not was_member and is_member: logger.info("%s added the bot to the group %s", cause_name, chat.title) context.bot_data.setdefault("group_ids", set()).add(chat.id) elif was_member and not is_member: logger.info("%s removed the bot from the group %s", cause_name, chat.title) context.bot_data.setdefault("group_ids", set()).discard(chat.id) else: if not was_member and is_member: logger.info("%s added the bot to the channel %s", cause_name, chat.title) context.bot_data.setdefault("channel_ids", set()).add(chat.id) elif was_member and not is_member: logger.info("%s removed the bot from the channel %s", cause_name, chat.title) context.bot_data.setdefault("channel_ids", set()).discard(chat.id) async def show_chats(update: Update, context: CallbackContext.DEFAULT_TYPE) -> None: """Shows which chats the bot is in""" user_ids = ", ".join(str(uid) for uid in context.bot_data.setdefault("user_ids", set())) group_ids = ", ".join(str(gid) for gid in context.bot_data.setdefault("group_ids", set())) channel_ids = ", ".join(str(cid) for cid in context.bot_data.setdefault("channel_ids", set())) text = ( f"@{context.bot.username} is currently in a conversation with the user IDs {user_ids}." f" Moreover it is a member of the groups with IDs {group_ids} " f"and administrator in the channels with IDs {channel_ids}." ) await update.effective_message.reply_text(text) async def greet_chat_members(update: Update, context: CallbackContext.DEFAULT_TYPE) -> None: """Greets new users in chats and announces when someone leaves""" result = extract_status_change(update.chat_member) if result is None: return was_member, is_member = result cause_name = update.chat_member.from_user.mention_html() member_name = update.chat_member.new_chat_member.user.mention_html() if not was_member and is_member: await update.effective_chat.send_message( f"{member_name} was added by {cause_name}. Welcome!", parse_mode=ParseMode.HTML, ) elif was_member and not is_member: await update.effective_chat.send_message( f"{member_name} is no longer with us. Thanks a lot, {cause_name} ...", parse_mode=ParseMode.HTML, ) def main() -> None: """Start the bot.""" # Create the Application and pass it your bot's token. application = Application.builder().token("TOKEN").build() # Keep track of which chats the bot is in application.add_handler(ChatMemberHandler(track_chats, ChatMemberHandler.MY_CHAT_MEMBER)) application.add_handler(CommandHandler("show_chats", show_chats)) # Handle members joining/leaving chats. application.add_handler(ChatMemberHandler(greet_chat_members, ChatMemberHandler.CHAT_MEMBER)) # Run the bot until the user presses Ctrl-C # We pass 'allowed_updates' handle *all* updates including `chat_member` updates # To reset this, simply pass `allowed_updates=[]` application.run_polling(allowed_updates=Update.ALL_TYPES) if __name__ == "__main__": main()