#!/usr/bin/env python # # A library that provides a Python interface to the Telegram Bot API # Copyright (C) 2015-2022 # Leandro Toledo de Souza # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser Public License for more details. # # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. """This module contains the class Updater, which tries to make creating Telegram bots intuitive.""" import asyncio import logging import ssl from contextlib import AbstractAsyncContextManager from pathlib import Path from types import TracebackType from typing import TYPE_CHECKING, Callable, Coroutine, List, Optional, Type, TypeVar, Union from telegram._utils.defaultvalue import DEFAULT_NONE from telegram._utils.types import ODVInput from telegram.error import InvalidToken, RetryAfter, TelegramError, TimedOut from telegram.ext._utils.webhookhandler import WebhookAppClass, WebhookServer if TYPE_CHECKING: from telegram import Bot _UpdaterType = TypeVar("_UpdaterType", bound="Updater") # pylint: disable=invalid-name class Updater(AbstractAsyncContextManager): """This class fetches updates for the bot either via long polling or by starting a webhook server. Received updates are enqueued into the :attr:`update_queue` and may be fetched from there to handle them appropriately. Instances of this class can be used as asyncio context managers, where .. code:: python async with updater: # code is roughly equivalent to .. code:: python try: await updater.initialize() # code finally: await updater.shutdown() .. versionchanged:: 20.0 * Removed argument and attribute ``user_sig_handler`` * The only arguments and attributes are now :attr:`bot` and :attr:`update_queue` as now the sole purpose of this class is to fetch updates. The entry point to a PTB application is now :class:`telegram.ext.Application`. Args: bot (:class:`telegram.Bot`): The bot used with this Updater. update_queue (:class:`asyncio.Queue`): Queue for the updates. Attributes: bot (:class:`telegram.Bot`): The bot used with this Updater. update_queue (:class:`asyncio.Queue`): Queue for the updates. """ __slots__ = ( "bot", "_logger", "update_queue", "_last_update_id", "_running", "_initialized", "_httpd", "__lock", "__polling_task", ) def __init__( self, bot: "Bot", update_queue: asyncio.Queue, ): self.bot = bot self.update_queue = update_queue self._last_update_id = 0 self._running = False self._initialized = False self._httpd: Optional[WebhookServer] = None self.__lock = asyncio.Lock() self.__polling_task: Optional[asyncio.Task] = None self._logger = logging.getLogger(__name__) @property def running(self) -> bool: return self._running async def initialize(self) -> None: """Initializes the Updater & the associated :attr:`bot` by calling :meth:`telegram.Bot.initialize`. .. seealso:: :meth:`shutdown` """ if self._initialized: self._logger.debug("This Updater is already initialized.") return await self.bot.initialize() self._initialized = True async def shutdown(self) -> None: """ Shutdown the Updater & the associated :attr:`bot` by calling :meth:`telegram.Bot.shutdown`. .. seealso:: :meth:`initialize` Raises: :exc:`RuntimeError`: If the updater is still running. """ if self.running: raise RuntimeError("This Updater is still running!") if not self._initialized: self._logger.debug("This Updater is already shut down. Returning.") return await self.bot.shutdown() self._initialized = False self._logger.debug("Shut down of Updater complete") async def __aenter__(self: _UpdaterType) -> _UpdaterType: """Simple context manager which initializes the Updater.""" try: await self.initialize() return self except Exception as exc: await self.shutdown() raise exc async def __aexit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: """Shutdown the Updater from the context manager.""" # Make sure not to return `True` so that exceptions are not suppressed # https://docs.python.org/3/reference/datamodel.html?#object.__aexit__ await self.shutdown() async def start_polling( self, poll_interval: float = 0.0, timeout: int = 10, bootstrap_retries: int = -1, read_timeout: float = 2, write_timeout: ODVInput[float] = DEFAULT_NONE, connect_timeout: ODVInput[float] = DEFAULT_NONE, pool_timeout: ODVInput[float] = DEFAULT_NONE, allowed_updates: List[str] = None, drop_pending_updates: bool = None, error_callback: Callable[[TelegramError], None] = None, ) -> asyncio.Queue: """Starts polling updates from Telegram. .. versionchanged:: 20.0 Removed the ``clean`` argument in favor of :paramref:`drop_pending_updates`. Args: poll_interval (:obj:`float`, optional): Time to wait between polling updates from Telegram in seconds. Default is ``0.0``. timeout (:obj:`float`, optional): Passed to :paramref:`telegram.Bot.get_updates.timeout`. Defaults to ``10`` seconds. bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the :class:`telegram.ext.Updater` will retry on failures on the Telegram server. * < 0 - retry indefinitely (default) * 0 - no retries * > 0 - retry up to X times read_timeout (:obj:`float`, optional): Value to pass to :paramref:`telegram.Bot.get_updates.read_timeout`. Defaults to ``2``. write_timeout (:obj:`float` | :obj:`None`, optional): Value to pass to :paramref:`telegram.Bot.get_updates.write_timeout`. Defaults to :attr:`~telegram.request.BaseRequest.DEFAULT_NONE`. connect_timeout (:obj:`float` | :obj:`None`, optional): Value to pass to :paramref:`telegram.Bot.get_updates.connect_timeout`. Defaults to :attr:`~telegram.request.BaseRequest.DEFAULT_NONE`. pool_timeout (:obj:`float` | :obj:`None`, optional): Value to pass to :paramref:`telegram.Bot.get_updates.pool_timeout`. Defaults to :attr:`~telegram.request.BaseRequest.DEFAULT_NONE`. allowed_updates (List[:obj:`str`], optional): Passed to :meth:`telegram.Bot.get_updates`. drop_pending_updates (:obj:`bool`, optional): Whether to clean any pending updates on Telegram servers before actually starting to poll. Default is :obj:`False`. .. versionadded :: 13.4 error_callback (Callable[[:exc:`telegram.error.TelegramError`], :obj:`None`], \ optional): Callback to handle :exc:`telegram.error.TelegramError` s that occur while calling :meth:`telegram.Bot.get_updates` during polling. Defaults to :obj:`None`, in which case errors will be logged. Callback signature:: def callback(error: telegram.error.TelegramError) Note: The :paramref:`error_callback` must *not* be a :term:`coroutine function`! If asynchronous behavior of the callback is wanted, please schedule a task from within the callback. Returns: :class:`asyncio.Queue`: The update queue that can be filled from the main thread. Raises: :exc:`RuntimeError`: If the updater is already running or was not initialized. """ if error_callback and asyncio.iscoroutinefunction(error_callback): raise TypeError( "The `error_callback` must not be a coroutine function! Use an ordinary function " "instead. " ) async with self.__lock: if self.running: raise RuntimeError("This Updater is already running!") if not self._initialized: raise RuntimeError("This Updater was not initialized via `Updater.initialize`!") self._running = True try: # Create & start tasks polling_ready = asyncio.Event() await self._start_polling( poll_interval=poll_interval, timeout=timeout, read_timeout=read_timeout, write_timeout=write_timeout, connect_timeout=connect_timeout, pool_timeout=pool_timeout, bootstrap_retries=bootstrap_retries, drop_pending_updates=drop_pending_updates, allowed_updates=allowed_updates, ready=polling_ready, error_callback=error_callback, ) self._logger.debug("Waiting for polling to start") await polling_ready.wait() self._logger.debug("Polling updates from Telegram started") return self.update_queue except Exception as exc: self._running = False raise exc async def _start_polling( self, poll_interval: float, timeout: int, read_timeout: float, write_timeout: ODVInput[float], connect_timeout: ODVInput[float], pool_timeout: ODVInput[float], bootstrap_retries: int, drop_pending_updates: Optional[bool], allowed_updates: Optional[List[str]], ready: asyncio.Event, error_callback: Optional[Callable[[TelegramError], None]], ) -> None: self._logger.debug("Updater started (polling)") # the bootstrapping phase does two things: # 1) make sure there is no webhook set # 2) apply drop_pending_updates await self._bootstrap( bootstrap_retries, drop_pending_updates=drop_pending_updates, webhook_url="", allowed_updates=None, ) self._logger.debug("Bootstrap done") async def polling_action_cb() -> bool: try: updates = await self.bot.get_updates( offset=self._last_update_id, timeout=timeout, read_timeout=read_timeout, connect_timeout=connect_timeout, write_timeout=write_timeout, pool_timeout=pool_timeout, allowed_updates=allowed_updates, ) except asyncio.CancelledError as exc: # TODO: in py3.8+, CancelledError is a subclass of BaseException, so we can drop # this clause when we drop py3.7 raise exc except TelegramError as exc: # TelegramErrors should be processed by the network retry loop raise exc except Exception as exc: # Other exceptions should not. Let's log them for now. self._logger.critical( "Something went wrong processing the data received from Telegram. " "Received data was *not* processed!", exc_info=exc, ) return True if updates: if not self.running: self._logger.critical( "Updater stopped unexpectedly. Pulled updates will be ignored and again " "on restart." ) else: for update in updates: await self.update_queue.put(update) self._last_update_id = updates[-1].update_id + 1 # Add one to 'confirm' it return True # Keep fetching updates & don't quit. Polls with poll_interval. def default_error_callback(exc: TelegramError) -> None: self._logger.exception("Exception happened while polling for updates.", exc_info=exc) # Start task that runs in background, pulls # updates from Telegram and inserts them in the update queue of the # Application. self.__polling_task = asyncio.create_task( self._network_loop_retry( action_cb=polling_action_cb, on_err_cb=error_callback or default_error_callback, description="getting Updates", interval=poll_interval, ) ) if ready is not None: ready.set() async def start_webhook( self, listen: str = "127.0.0.1", port: int = 80, url_path: str = "", cert: Union[str, Path] = None, key: Union[str, Path] = None, bootstrap_retries: int = 0, webhook_url: str = None, allowed_updates: List[str] = None, drop_pending_updates: bool = None, ip_address: str = None, max_connections: int = 40, ) -> asyncio.Queue: """ Starts a small http server to listen for updates via webhook. If :paramref:`cert` and :paramref:`key` are not provided, the webhook will be started directly on http://listen:port/url_path, so SSL can be handled by another application. Else, the webhook will be started on https://listen:port/url_path. Also calls :meth:`telegram.Bot.set_webhook` as required. .. versionchanged:: 13.4 :meth:`start_webhook` now *always* calls :meth:`telegram.Bot.set_webhook`, so pass ``webhook_url`` instead of calling ``updater.bot.set_webhook(webhook_url)`` manually. .. versionchanged:: 20.0 Removed the ``clean`` argument in favor of :paramref:`drop_pending_updates` and removed the deprecated argument ``force_event_loop``. Args: listen (:obj:`str`, optional): IP-Address to listen on. Defaults to `127.0.0.1 `_. port (:obj:`int`, optional): Port the bot should be listening on. Must be one of :attr:`telegram.constants.SUPPORTED_WEBHOOK_PORTS`. Defaults to ``80``. url_path (:obj:`str`, optional): Path inside url (http(s)://listen:port/). Defaults to ``''``. cert (:class:`pathlib.Path` | :obj:`str`, optional): Path to the SSL certificate file. key (:class:`pathlib.Path` | :obj:`str`, optional): Path to the SSL key file. drop_pending_updates (:obj:`bool`, optional): Whether to clean any pending updates on Telegram servers before actually starting to poll. Default is :obj:`False`. .. versionadded :: 13.4 bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the :class:`telegram.ext.Updater` will retry on failures on the Telegram server. * < 0 - retry indefinitely * 0 - no retries (default) * > 0 - retry up to X times webhook_url (:obj:`str`, optional): Explicitly specify the webhook url. Useful behind NAT, reverse proxy, etc. Default is derived from :paramref:`listen`, :paramref:`port`, :paramref:`url_path`, :paramref:`cert`, and :paramref:`key`. ip_address (:obj:`str`, optional): Passed to :meth:`telegram.Bot.set_webhook`. Defaults to :obj:`None`. .. versionadded :: 13.4 allowed_updates (List[:obj:`str`], optional): Passed to :meth:`telegram.Bot.set_webhook`. Defaults to :obj:`None`. max_connections (:obj:`int`, optional): Passed to :meth:`telegram.Bot.set_webhook`. Defaults to ``40``. .. versionadded:: 13.6 Returns: :class:`queue.Queue`: The update queue that can be filled from the main thread. Raises: :exc:`RuntimeError`: If the updater is already running or was not initialized. """ async with self.__lock: if self.running: raise RuntimeError("This Updater is already running!") if not self._initialized: raise RuntimeError("This Updater was not initialized via `Updater.initialize`!") self._running = True try: # Create & start tasks webhook_ready = asyncio.Event() await self._start_webhook( listen=listen, port=port, url_path=url_path, cert=cert, key=key, bootstrap_retries=bootstrap_retries, drop_pending_updates=drop_pending_updates, webhook_url=webhook_url, allowed_updates=allowed_updates, ready=webhook_ready, ip_address=ip_address, max_connections=max_connections, ) self._logger.debug("Waiting for webhook server to start") await webhook_ready.wait() self._logger.debug("Webhook server started") except Exception as exc: self._running = False raise exc # Return the update queue so the main thread can insert updates return self.update_queue async def _start_webhook( self, listen: str, port: int, url_path: str, bootstrap_retries: int, allowed_updates: Optional[List[str]], cert: Union[str, Path] = None, key: Union[str, Path] = None, drop_pending_updates: bool = None, webhook_url: str = None, ready: asyncio.Event = None, ip_address: str = None, max_connections: int = 40, ) -> None: self._logger.debug("Updater thread started (webhook)") if not url_path.startswith("/"): url_path = f"/{url_path}" # Create Tornado app instance app = WebhookAppClass(url_path, self.bot, self.update_queue) # Form SSL Context # An SSLError is raised if the private key does not match with the certificate # Note that we only use the SSL certificate for the WebhookServer, if the key is also # present. This is because the WebhookServer may not actually be in charge of performing # the SSL handshake, e.g. in case a reverse proxy is used if cert is not None and key is not None: try: ssl_ctx: Optional[ssl.SSLContext] = ssl.create_default_context( ssl.Purpose.CLIENT_AUTH ) ssl_ctx.load_cert_chain(cert, key) # type: ignore[union-attr] except ssl.SSLError as exc: raise TelegramError("Invalid SSL Certificate") from exc else: ssl_ctx = None # Create and start server self._httpd = WebhookServer(listen, port, app, ssl_ctx) if not webhook_url: webhook_url = self._gen_webhook_url( protocol="https" if ssl_ctx else "http", listen=listen, port=port, url_path=url_path, ) # We pass along the cert to the webhook if present. await self._bootstrap( # Passing a Path or string only works if the bot is running against a local bot API # server, so let's read the contents cert=Path(cert).read_bytes() if cert else None, max_retries=bootstrap_retries, drop_pending_updates=drop_pending_updates, webhook_url=webhook_url, allowed_updates=allowed_updates, ip_address=ip_address, max_connections=max_connections, ) await self._httpd.serve_forever(ready=ready) @staticmethod def _gen_webhook_url(protocol: str, listen: str, port: int, url_path: str) -> str: # TODO: double check if this should be https in any case - the docs of start_webhook # say differently! return f"{protocol}://{listen}:{port}{url_path}" async def _network_loop_retry( self, action_cb: Callable[..., Coroutine], on_err_cb: Callable[[TelegramError], None], description: str, interval: float, ) -> None: """Perform a loop calling `action_cb`, retrying after network errors. Stop condition for loop: `self.running` evaluates :obj:`False` or return value of `action_cb` evaluates :obj:`False`. Args: action_cb (:term:`coroutine function`): Network oriented callback function to call. on_err_cb (:obj:`callable`): Callback to call when TelegramError is caught. Receives the exception object as a parameter. description (:obj:`str`): Description text to use for logs and exception raised. interval (:obj:`float` | :obj:`int`): Interval to sleep between each call to `action_cb`. """ self._logger.debug("Start network loop retry %s", description) cur_interval = interval while self.running: try: try: if not await action_cb(): break except RetryAfter as exc: self._logger.info("%s", exc) cur_interval = 0.5 + exc.retry_after except TimedOut as toe: self._logger.debug("Timed out %s: %s", description, toe) # If failure is due to timeout, we should retry asap. cur_interval = 0 except InvalidToken as pex: self._logger.error("Invalid token; aborting") raise pex except TelegramError as telegram_exc: self._logger.error("Error while %s: %s", description, telegram_exc) on_err_cb(telegram_exc) # increase waiting times on subsequent errors up to 30secs cur_interval = 1 if cur_interval == 0 else min(30, 1.5 * cur_interval) else: cur_interval = interval if cur_interval: await asyncio.sleep(cur_interval) except asyncio.CancelledError: self._logger.debug("Network loop retry %s was cancelled", description) break async def _bootstrap( self, max_retries: int, webhook_url: Optional[str], allowed_updates: Optional[List[str]], drop_pending_updates: bool = None, cert: Optional[bytes] = None, bootstrap_interval: float = 1, ip_address: str = None, max_connections: int = 40, ) -> None: """Prepares the setup for fetching updates: delete or set the webhook and drop pending updates if appropriate. If there are unsuccessful attempts, this will retry as specified by :paramref:`max_retries`. """ retries = 0 async def bootstrap_del_webhook() -> bool: self._logger.debug("Deleting webhook") if drop_pending_updates: self._logger.debug("Dropping pending updates from Telegram server") await self.bot.delete_webhook(drop_pending_updates=drop_pending_updates) return False async def bootstrap_set_webhook() -> bool: self._logger.debug("Setting webhook") if drop_pending_updates: self._logger.debug("Dropping pending updates from Telegram server") await self.bot.set_webhook( url=webhook_url, certificate=cert, allowed_updates=allowed_updates, ip_address=ip_address, drop_pending_updates=drop_pending_updates, max_connections=max_connections, ) return False def bootstrap_on_err_cb(exc: Exception) -> None: # We need this since retries is an immutable object otherwise and the changes # wouldn't propagate outside of thi function nonlocal retries if not isinstance(exc, InvalidToken) and (max_retries < 0 or retries < max_retries): retries += 1 self._logger.warning( "Failed bootstrap phase; try=%s max_retries=%s", retries, max_retries ) else: self._logger.error("Failed bootstrap phase after %s retries (%s)", retries, exc) raise exc # Dropping pending updates from TG can be efficiently done with the drop_pending_updates # parameter of delete/start_webhook, even in the case of polling. Also, we want to make # sure that no webhook is configured in case of polling, so we just always call # delete_webhook for polling if drop_pending_updates or not webhook_url: await self._network_loop_retry( bootstrap_del_webhook, bootstrap_on_err_cb, "bootstrap del webhook", bootstrap_interval, ) # Reset the retries counter for the next _network_loop_retry call retries = 0 # Restore/set webhook settings, if needed. Again, we don't know ahead if a webhook is set, # so we set it anyhow. if webhook_url: await self._network_loop_retry( bootstrap_set_webhook, bootstrap_on_err_cb, "bootstrap set webhook", bootstrap_interval, ) async def stop(self) -> None: """Stops the polling/webhook. .. seealso:: :meth:`start_polling`, :meth:`start_webhook` Raises: :exc:`RuntimeError`: If the updater is not running. """ async with self.__lock: if not self.running: raise RuntimeError("This Updater is not running!") self._logger.debug("Stopping Updater") self._running = False await self._stop_httpd() await self._stop_polling() self._logger.debug("Updater.stop() is complete") async def _stop_httpd(self) -> None: """Stops the Webhook server by calling ``WebhookServer.shutdown()``""" if self._httpd: self._logger.debug("Waiting for current webhook connection to be closed.") await self._httpd.shutdown() self._httpd = None async def _stop_polling(self) -> None: """Stops the polling task by awaiting it.""" if self.__polling_task: self._logger.debug("Waiting background polling task to finish up.") self.__polling_task.cancel() try: await self.__polling_task except asyncio.CancelledError: # This only happens in rare edge-cases, e.g. when `stop()` is called directly # after start_polling(), but lets better be safe than sorry ... pass self.__polling_task = None