mirror of
https://github.com/python-telegram-bot/python-telegram-bot.git
synced 2024-11-21 22:56:38 +01:00
195 lines
6.9 KiB
Python
195 lines
6.9 KiB
Python
#!/usr/bin/env python
|
|
# This program is dedicated to the public domain under the CC0 license.
|
|
# pylint: disable=import-error,wrong-import-position
|
|
"""
|
|
Simple example of a bot that uses a custom webhook setup and handles custom updates.
|
|
For the custom webhook setup, the libraries `starlette` and `uvicorn` are used. Please install
|
|
them as `pip install starlette~=0.20.0 uvicorn~=0.17.0`.
|
|
Note that any other `asyncio` based web server framework can be used for a custom webhook setup
|
|
just as well.
|
|
|
|
Usage:
|
|
Set bot token, url, admin chat_id and port at the start of the `main` function.
|
|
You may also need to change the `listen` value in the uvicorn configuration to match your setup.
|
|
Press Ctrl-C on the command line or send a signal to the process to stop the bot.
|
|
"""
|
|
import asyncio
|
|
import html
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from http import HTTPStatus
|
|
|
|
import uvicorn
|
|
from starlette.applications import Starlette
|
|
from starlette.requests import Request
|
|
from starlette.responses import PlainTextResponse, Response
|
|
from starlette.routing import Route
|
|
|
|
from telegram import __version__ as TG_VER
|
|
|
|
try:
|
|
from telegram import __version_info__
|
|
except ImportError:
|
|
__version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment]
|
|
|
|
if __version_info__ < (20, 0, 0, "alpha", 1):
|
|
raise RuntimeError(
|
|
f"This example is not compatible with your current PTB version {TG_VER}. To view the "
|
|
f"{TG_VER} version of this example, "
|
|
f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
|
|
)
|
|
|
|
from telegram import Update
|
|
from telegram.constants import ParseMode
|
|
from telegram.ext import (
|
|
Application,
|
|
CallbackContext,
|
|
CommandHandler,
|
|
ContextTypes,
|
|
ExtBot,
|
|
TypeHandler,
|
|
)
|
|
|
|
# Enable logging
|
|
logging.basicConfig(
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
|
|
)
|
|
# set higher logging level for httpx to avoid all GET and POST requests being logged
|
|
logging.getLogger("httpx").setLevel(logging.WARNING)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class WebhookUpdate:
|
|
"""Simple dataclass to wrap a custom update type"""
|
|
|
|
user_id: int
|
|
payload: str
|
|
|
|
|
|
class CustomContext(CallbackContext[ExtBot, dict, dict, dict]):
|
|
"""
|
|
Custom CallbackContext class that makes `user_data` available for updates of type
|
|
`WebhookUpdate`.
|
|
"""
|
|
|
|
@classmethod
|
|
def from_update(
|
|
cls,
|
|
update: object,
|
|
application: "Application",
|
|
) -> "CustomContext":
|
|
if isinstance(update, WebhookUpdate):
|
|
return cls(application=application, user_id=update.user_id)
|
|
return super().from_update(update, application)
|
|
|
|
|
|
async def start(update: Update, context: CustomContext) -> None:
|
|
"""Display a message with instructions on how to use this bot."""
|
|
url = context.bot_data["url"]
|
|
payload_url = html.escape(f"{url}/submitpayload?user_id=<your user id>&payload=<payload>")
|
|
text = (
|
|
f"To check if the bot is still running, call <code>{url}/healthcheck</code>.\n\n"
|
|
f"To post a custom update, call <code>{payload_url}</code>."
|
|
)
|
|
await update.message.reply_html(text=text)
|
|
|
|
|
|
async def webhook_update(update: WebhookUpdate, context: CustomContext) -> None:
|
|
"""Callback that handles the custom updates."""
|
|
chat_member = await context.bot.get_chat_member(chat_id=update.user_id, user_id=update.user_id)
|
|
payloads = context.user_data.setdefault("payloads", [])
|
|
payloads.append(update.payload)
|
|
combined_payloads = "</code>\n• <code>".join(payloads)
|
|
text = (
|
|
f"The user {chat_member.user.mention_html()} has sent a new payload. "
|
|
f"So far they have sent the following payloads: \n\n• <code>{combined_payloads}</code>"
|
|
)
|
|
await context.bot.send_message(
|
|
chat_id=context.bot_data["admin_chat_id"], text=text, parse_mode=ParseMode.HTML
|
|
)
|
|
|
|
|
|
async def main() -> None:
|
|
"""Set up the application and a custom webserver."""
|
|
url = "https://domain.tld"
|
|
admin_chat_id = 123456
|
|
port = 8000
|
|
|
|
context_types = ContextTypes(context=CustomContext)
|
|
# Here we set updater to None because we want our custom webhook server to handle the updates
|
|
# and hence we don't need an Updater instance
|
|
application = (
|
|
Application.builder().token("TOKEN").updater(None).context_types(context_types).build()
|
|
)
|
|
# save the values in `bot_data` such that we may easily access them in the callbacks
|
|
application.bot_data["url"] = url
|
|
application.bot_data["admin_chat_id"] = admin_chat_id
|
|
|
|
# register handlers
|
|
application.add_handler(CommandHandler("start", start))
|
|
application.add_handler(TypeHandler(type=WebhookUpdate, callback=webhook_update))
|
|
|
|
# Pass webhook settings to telegram
|
|
await application.bot.set_webhook(url=f"{url}/telegram", allowed_updates=Update.ALL_TYPES)
|
|
|
|
# Set up webserver
|
|
async def telegram(request: Request) -> Response:
|
|
"""Handle incoming Telegram updates by putting them into the `update_queue`"""
|
|
await application.update_queue.put(
|
|
Update.de_json(data=await request.json(), bot=application.bot)
|
|
)
|
|
return Response()
|
|
|
|
async def custom_updates(request: Request) -> PlainTextResponse:
|
|
"""
|
|
Handle incoming webhook updates by also putting them into the `update_queue` if
|
|
the required parameters were passed correctly.
|
|
"""
|
|
try:
|
|
user_id = int(request.query_params["user_id"])
|
|
payload = request.query_params["payload"]
|
|
except KeyError:
|
|
return PlainTextResponse(
|
|
status_code=HTTPStatus.BAD_REQUEST,
|
|
content="Please pass both `user_id` and `payload` as query parameters.",
|
|
)
|
|
except ValueError:
|
|
return PlainTextResponse(
|
|
status_code=HTTPStatus.BAD_REQUEST,
|
|
content="The `user_id` must be a string!",
|
|
)
|
|
|
|
await application.update_queue.put(WebhookUpdate(user_id=user_id, payload=payload))
|
|
return PlainTextResponse("Thank you for the submission! It's being forwarded.")
|
|
|
|
async def health(_: Request) -> PlainTextResponse:
|
|
"""For the health endpoint, reply with a simple plain text message."""
|
|
return PlainTextResponse(content="The bot is still running fine :)")
|
|
|
|
starlette_app = Starlette(
|
|
routes=[
|
|
Route("/telegram", telegram, methods=["POST"]),
|
|
Route("/healthcheck", health, methods=["GET"]),
|
|
Route("/submitpayload", custom_updates, methods=["POST", "GET"]),
|
|
]
|
|
)
|
|
webserver = uvicorn.Server(
|
|
config=uvicorn.Config(
|
|
app=starlette_app,
|
|
port=port,
|
|
use_colors=False,
|
|
host="127.0.0.1",
|
|
)
|
|
)
|
|
|
|
# Run application and webserver together
|
|
async with application:
|
|
await application.start()
|
|
await webserver.serve()
|
|
await application.stop()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|