mirror of
https://github.com/python-telegram-bot/python-telegram-bot.git
synced 2024-11-24 16:17:37 +01:00
update most & remove some snippets
parent
23e2a175e6
commit
81796a0809
1 changed files with 24 additions and 208 deletions
232
Code-snippets.md
232
Code-snippets.md
|
@ -59,6 +59,7 @@ To fetch messages sent to your Bot, you can use the [getUpdates](https://core.te
|
|||
updates = await bot.get_updates()
|
||||
print([u.message.text for u in updates])
|
||||
```
|
||||
|
||||
---
|
||||
#### Fetch images sent to your Bot
|
||||
|
||||
|
@ -510,12 +511,16 @@ slot_machine_value = {
|
|||
---
|
||||
#### Get the new members message
|
||||
```python
|
||||
def add_group(update: Update, context: CallbackContext):
|
||||
for member in update.message.new_chat_members:
|
||||
update.message.reply_text(f"{member.full_name} just joined the group")
|
||||
import asyncio
|
||||
|
||||
add_group_handle = MessageHandler(Filters.status_update.new_chat_members, add_group)
|
||||
dispatcher.add_handler(add_group_handle)
|
||||
async def add_group(update: Update, context: CallbackContext):
|
||||
await asyncio.gather(
|
||||
update.message.reply_text(f"{member.full_name} just joined the group")
|
||||
for member in update.message.new_chat_members
|
||||
)
|
||||
|
||||
add_group_handle = MessageHandler(filters.StatusUpdate.NEW_CHAT_MEMBERS , add_group)
|
||||
application.add_handler(add_group_handle)
|
||||
```
|
||||
Note that service messages about non-bot users joining the chat are removed from large groups. You can get the new members message by following the [chatmemberbot.py example](https://github.com/python-telegram-bot/python-telegram-bot/tree/master/examples#chatmemberbotpy).
|
||||
|
||||
|
@ -523,14 +528,14 @@ Note that service messages about non-bot users joining the chat are removed from
|
|||
#### Exclude forwarded channel posts in discussion groups from MessageHandlers
|
||||
If you're using `MessageHandlers` and do not want them to respond to the channel posts automatically forwarded to the discussion group linked to your channel, you can use this filter in your `MessageHandler` (requires PTB v13.9+):
|
||||
```python
|
||||
~ Filters.is_automatic_forward
|
||||
~ filters.IS_AUTOMATIC_FORWARD
|
||||
```
|
||||
|
||||
---
|
||||
#### Exclude Messages from anonymous Admins
|
||||
If you're using `MessageHandlers` and do not want them to respond to messages from anonymous admins, you can use this filter in your `MessageHandler`:
|
||||
```python
|
||||
~ Filters.sender_chat.super_group
|
||||
~ filters.SenderChat.SUPER_GROUP
|
||||
```
|
||||
|
||||
---
|
||||
|
@ -544,7 +549,7 @@ This decorator allows you to register a function as a command handler in a _Flas
|
|||
def command_handler(command):
|
||||
def decorator(func):
|
||||
handler = CommandHandler(command, func)
|
||||
dispatcher.add_handler(handler)
|
||||
application.add_handler(handler)
|
||||
return func
|
||||
return decorator
|
||||
```
|
||||
|
@ -555,8 +560,8 @@ Add the `@command_handler(command)` decorator on top of your handler function:
|
|||
|
||||
```python
|
||||
@command_handler("hello")
|
||||
def hello(update, context):
|
||||
context.bot.send_message(chat_id=update.effective_chat.id, text="Hello world!")
|
||||
async def hello(update, context):
|
||||
await context.bot.send_message(chat_id=update.effective_chat.id, text="Hello world!")
|
||||
```
|
||||
|
||||
**Note**: You can modify this decorator in order to register any type of handler (see [[Types Of Handlers|Types-Of-Handlers]]). Please also note that PTB deliberately does not provide such functionality out of the box due to the reasons mentioned in [#899](https://github.com/python-telegram-bot/python-telegram-bot/issues/899).
|
||||
|
@ -564,9 +569,6 @@ def hello(update, context):
|
|||
---
|
||||
#### Restrict access to a handler (decorator)
|
||||
|
||||
<!--- The extraction of the user_id is going to be built in on python-telegram-bot version 6.0.
|
||||
TODO: Upon release, update this snippet!--->
|
||||
|
||||
This decorator allows you to restrict the access of a handler to only the `user_ids` specified in `LIST_OF_ADMINS`.
|
||||
|
||||
```python
|
||||
|
@ -576,12 +578,12 @@ LIST_OF_ADMINS = [12345678, 87654321]
|
|||
|
||||
def restricted(func):
|
||||
@wraps(func)
|
||||
def wrapped(update, context, *args, **kwargs):
|
||||
async def wrapped(update, context, *args, **kwargs):
|
||||
user_id = update.effective_user.id
|
||||
if user_id not in LIST_OF_ADMINS:
|
||||
print("Unauthorized access denied for {}.".format(user_id))
|
||||
return
|
||||
return func(update, context, *args, **kwargs)
|
||||
return await func(update, context, *args, **kwargs)
|
||||
return wrapped
|
||||
```
|
||||
|
||||
|
@ -591,7 +593,7 @@ Add a `@restricted` decorator on top of your handler declaration:
|
|||
|
||||
```python
|
||||
@restricted
|
||||
def my_handler(update, context):
|
||||
async def my_handler(update, context):
|
||||
pass # only accessible if `user_id` is in `LIST_OF_ADMINS`.
|
||||
```
|
||||
|
||||
|
@ -606,9 +608,9 @@ def send_action(action):
|
|||
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
def command_func(update, context, *args, **kwargs):
|
||||
context.bot.send_chat_action(chat_id=update.effective_message.chat_id, action=action)
|
||||
return func(update, context, *args, **kwargs)
|
||||
async def command_func(update, context, *args, **kwargs):
|
||||
await context.bot.send_chat_action(chat_id=update.effective_message.chat_id, action=action)
|
||||
return await func(update, context, *args, **kwargs)
|
||||
return command_func
|
||||
|
||||
return decorator
|
||||
|
@ -626,11 +628,11 @@ send_upload_photo_action = send_action(ChatAction.UPLOAD_PHOTO)
|
|||
With the above aliases, the following decorators are equivalent
|
||||
```python
|
||||
@send_typing_action
|
||||
def my_handler(update, context):
|
||||
async def my_handler(update, context):
|
||||
pass # user will see 'typing' while your bot is handling the request.
|
||||
|
||||
@send_action(ChatAction.TYPING)
|
||||
def my_handler(update, context):
|
||||
async def my_handler(update, context):
|
||||
pass # user will see 'typing' while your bot is handling the request.
|
||||
```
|
||||
All possible actions are documented [here](https://core.telegram.org/bots/api#sendchataction).
|
||||
|
@ -673,7 +675,7 @@ button_list = [
|
|||
InlineKeyboardButton("row 2", callback_data=...)
|
||||
]
|
||||
reply_markup = InlineKeyboardMarkup(util.build_menu(button_list, n_cols=2))
|
||||
bot.send_message(..., "A two-column menu", reply_markup=reply_markup)
|
||||
await bot.send_message(..., "A two-column menu", reply_markup=reply_markup)
|
||||
```
|
||||
|
||||
Or, if you need a dynamic version, use list comprehension to generate your `button_list` dynamically from a list of strings:
|
||||
|
@ -687,192 +689,6 @@ This is especially useful if put inside a helper method like `get_data_buttons`
|
|||
|
||||
To handle the `callback_data`, you need to set a `CallbackQueryHandler`.
|
||||
|
||||
---
|
||||
#### Cached Telegram group administrator check
|
||||
If you want to limit certain bot functions to group administrators, you have to test if a user is an administrator in the group in question. This however requires an extra API request, which is why it can make sense to cache this information for a certain time, especially if your bot is very busy.
|
||||
|
||||
This snippet requires [this timeout-based cache decorator](http://code.activestate.com/recipes/325905-memoize-decorator-with-timeout/#c1). ([gist mirror](https://gist.github.com/jh0ker/56f5b4fb7d015b1b9e4c74d4a91d4568))
|
||||
|
||||
Save the decorator to a new file named `mwt.py` and add this line to your imports:
|
||||
```python
|
||||
from mwt import MWT
|
||||
```
|
||||
|
||||
Then, add the following decorated function to your script. You can change the timeout as required.
|
||||
```python
|
||||
@MWT(timeout=60*60)
|
||||
def get_admin_ids(bot, chat_id):
|
||||
"""Returns a list of admin IDs for a given chat. Results are cached for 1 hour."""
|
||||
return [admin.user.id for admin in bot.get_chat_administrators(chat_id)]
|
||||
```
|
||||
|
||||
You can then use the function like this:
|
||||
```python
|
||||
if update.effective_user.id in get_admin_ids(context.bot, update.message.chat_id):
|
||||
# admin only
|
||||
```
|
||||
|
||||
**Note:** Private chats and groups with `all_members_are_administrator` flag, are not covered by this snippet. Make sure you handle them.
|
||||
|
||||
---
|
||||
#### Simple way of restarting the bot
|
||||
|
||||
The following example allows you to restart the bot from within a handler. It goes without saying that you should protect this method from access by unauthorized users, which is why we are using a `Filters.user` filter. If you want multiple users to have access the restart command, you can pass a list of usernames as well. You can also filter by user IDs which is arguably a bit safer since they can't change. See the [docs](https://python-telegram-bot.readthedocs.io/en/latest/telegram.ext.filters.html#telegram.ext.filters.Filters.user) for more information.
|
||||
|
||||
This example is using closures so it has access to the `updater` variable. Alternatively, you could make it global.
|
||||
|
||||
<details><summary>Click to expand</summary><p>
|
||||
|
||||
```python
|
||||
import os
|
||||
import sys
|
||||
from threading import Thread
|
||||
|
||||
---
|
||||
# Other code
|
||||
|
||||
def main():
|
||||
updater = Updater("TOKEN", use_context=True)
|
||||
dp = updater.dispatcher
|
||||
|
||||
# Add your other handlers here...
|
||||
|
||||
def stop_and_restart():
|
||||
"""Gracefully stop the Updater and replace the current process with a new one"""
|
||||
updater.stop()
|
||||
os.execl(sys.executable, sys.executable, *sys.argv)
|
||||
|
||||
def restart(update, context):
|
||||
update.message.reply_text('Bot is restarting...')
|
||||
Thread(target=stop_and_restart).start()
|
||||
|
||||
# ...or here...
|
||||
|
||||
dp.add_handler(CommandHandler('r', restart, filters=Filters.user(username='@jh0ker')))
|
||||
|
||||
# ...or here, depending on your preference :)
|
||||
|
||||
updater.start_polling()
|
||||
updater.idle()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
```
|
||||
|
||||
</p></details>
|
||||
|
||||
---
|
||||
#### Store ConversationHandler States
|
||||
|
||||
Version 12 and up includes tools for [[making your bot persistent|Making-your-bot-persistent]].
|
||||
|
||||
---
|
||||
#### Save and load jobs using pickle
|
||||
|
||||
**WARNING:** This snippet was written for v12.x and can't be used with v13+.
|
||||
|
||||
The following snippet pickles the jobs in the job queue periodically and on bot shutdown, and unpickles and queues them again on startup. Since pickle doesn't support threading primitives, therefore their values and states are extracted (this information may change in the future, always check the `Job` documentation).
|
||||
|
||||
**Note:** `Job` is not yet safe for threads so eventually some special condition may occur. In a previous example, the content of `Job` was modified which resulted in some asynchronous processing errors; now the content of `Job` is extracted without modifying it which is much more safe.
|
||||
|
||||
<details><summary>Click to expand</summary><p>
|
||||
|
||||
```python
|
||||
import pickle
|
||||
from time import time
|
||||
from datetime import timedelta
|
||||
|
||||
from telegram.ext import Updater, Job
|
||||
|
||||
|
||||
JOBS_PICKLE = 'job_tuples.pickle'
|
||||
|
||||
---
|
||||
# WARNING: This information may change in future versions (changes are planned)
|
||||
JOB_DATA = ('callback', 'interval', 'repeat', 'context', 'days', 'name', 'tzinfo')
|
||||
JOB_STATE = ('_remove', '_enabled')
|
||||
|
||||
|
||||
def load_jobs(jq):
|
||||
with open(JOBS_PICKLE, 'rb') as fp:
|
||||
while True:
|
||||
try:
|
||||
next_t, data, state = pickle.load(fp)
|
||||
except EOFError:
|
||||
break # loaded all jobs
|
||||
|
||||
# New object with the same data
|
||||
job = Job(**{var: val for var, val in zip(JOB_DATA, data)})
|
||||
|
||||
# Restore the state it had
|
||||
for var, val in zip(JOB_STATE, state):
|
||||
attribute = getattr(job, var)
|
||||
getattr(attribute, 'set' if val else 'clear')()
|
||||
|
||||
job.job_queue = jq
|
||||
|
||||
next_t -= time() # convert from absolute to relative time
|
||||
|
||||
jq._put(job, next_t)
|
||||
|
||||
|
||||
def save_jobs(jq):
|
||||
with jq._queue.mutex: # in case job_queue makes a change
|
||||
|
||||
if jq:
|
||||
job_tuples = jq._queue.queue
|
||||
else:
|
||||
job_tuples = []
|
||||
|
||||
with open(JOBS_PICKLE, 'wb') as fp:
|
||||
for next_t, job in job_tuples:
|
||||
|
||||
# This job is always created at the start
|
||||
if job.name == 'save_jobs_job':
|
||||
continue
|
||||
|
||||
# Threading primitives are not pickleable
|
||||
data = tuple(getattr(job, var) for var in JOB_DATA)
|
||||
state = tuple(getattr(job, var).is_set() for var in JOB_STATE)
|
||||
|
||||
# Pickle the job
|
||||
pickle.dump((next_t, data, state), fp)
|
||||
|
||||
|
||||
def save_jobs_job(context):
|
||||
save_jobs(context.job_queue)
|
||||
|
||||
|
||||
def main():
|
||||
# updater = Updater(...)
|
||||
|
||||
job_queue = updater.job_queue
|
||||
|
||||
# Periodically save jobs
|
||||
job_queue.run_repeating(save_jobs_job, timedelta(minutes=1))
|
||||
|
||||
try:
|
||||
load_jobs(job_queue)
|
||||
|
||||
except FileNotFoundError:
|
||||
# First run
|
||||
pass
|
||||
|
||||
# updater.start_[polling|webhook]()
|
||||
# updater.idle()
|
||||
|
||||
# Run again after bot has been properly shutdown
|
||||
save_jobs(job_queue)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
```
|
||||
|
||||
</p></details>
|
||||
|
||||
---
|
||||
#### Verify data from [Telegram Web Login Widget](https://core.telegram.org/widgets/login).
|
||||
|
||||
When using a [`LoginUrl`](https://core.telegram.org/bots/api#loginurl) in an [`InlineKeyboardButton`](https://core.telegram.org/bots/api#inlinekeyboardbutton) to authorize a user on your website via Telegram, you'll have to to check the hash of the received data to verify the data of the integrity as described [here](https://core.telegram.org/widgets/login#checking-authorization)
|
||||
|
|
Loading…
Reference in a new issue