python-telegram-bot/examples/rawapibot.py
2022-06-09 17:22:32 +02:00

78 lines
2.6 KiB
Python

#!/usr/bin/env python
# pylint: disable=wrong-import-position
"""Simple Bot to reply to Telegram messages.
This is built on the API wrapper, see echobot.py to see the same example built
on the telegram.ext bot framework.
This program is dedicated to the public domain under the CC0 license.
"""
import asyncio
import logging
from typing import NoReturn
from telegram import __version__ as TG_VER
try:
from telegram import __version_info__
except ImportError:
__version_info__ = (0, 0, 0, 0, 0) # type: ignore[assignment] # type: ignore[assignment]
if __version_info__ < (20, 0, 0, "alpha", 1):
raise RuntimeError(
f"This example is not compatible with your current PTB version {TG_VER}. To view the "
f"{TG_VER} version of this example, "
f"visit https://docs.python-telegram-bot.org/en/v{TG_VER}/examples.html"
)
from telegram import Bot
from telegram.error import Forbidden, NetworkError
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
async def main() -> NoReturn:
"""Run the bot."""
# Here we use the `async with` syntax to properly initialize and shutdown resources.
async with Bot("TOKEN") as bot:
# get the first pending update_id, this is so we can skip over it in case
# we get a "Forbidden" exception.
try:
update_id = (await bot.get_updates())[0].update_id
except IndexError:
update_id = None
logger.info("listening for new messages...")
while True:
try:
update_id = await echo(bot, update_id)
except NetworkError:
await asyncio.sleep(1)
except Forbidden:
# The user has removed or blocked the bot.
update_id += 1
async def echo(bot: Bot, update_id: int) -> int:
"""Echo the message the user sent."""
# Request updates after the last update_id
updates = await bot.get_updates(offset=update_id, timeout=10)
for update in updates:
next_update_id = update.update_id + 1
# your bot can receive updates without messages
# and not all messages contain text
if update.message and update.message.text:
# Reply to the message
logger.info("Found message %s!", update.message.text)
await update.message.reply_text(update.message.text)
return next_update_id
return update_id
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt: # Ignore exception when Ctrl-C is pressed
pass