python-telegram-bot/telegram/ext/_updater.py
2024-04-12 12:39:38 +02:00

910 lines
37 KiB
Python

#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015-2024
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
"""This module contains the class Updater, which tries to make creating Telegram bots intuitive."""
import asyncio
import contextlib
import ssl
from pathlib import Path
from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
AsyncContextManager,
Callable,
Coroutine,
List,
Optional,
Type,
TypeVar,
Union,
)
from telegram._utils.defaultvalue import DEFAULT_80, DEFAULT_IP, DEFAULT_NONE, DefaultValue
from telegram._utils.logging import get_logger
from telegram._utils.repr import build_repr_with_selected_attrs
from telegram._utils.types import DVType, ODVInput
from telegram.error import InvalidToken, RetryAfter, TelegramError, TimedOut
try:
from telegram.ext._utils.webhookhandler import WebhookAppClass, WebhookServer
WEBHOOKS_AVAILABLE = True
except ImportError:
WEBHOOKS_AVAILABLE = False
if TYPE_CHECKING:
from socket import socket
from telegram import Bot
_UpdaterType = TypeVar("_UpdaterType", bound="Updater") # pylint: disable=invalid-name
_LOGGER = get_logger(__name__)
class Updater(AsyncContextManager["Updater"]):
"""This class fetches updates for the bot either via long polling or by starting a webhook
server. Received updates are enqueued into the :attr:`update_queue` and may be fetched from
there to handle them appropriately.
Instances of this class can be used as asyncio context managers, where
.. code:: python
async with updater:
# code
is roughly equivalent to
.. code:: python
try:
await updater.initialize()
# code
finally:
await updater.shutdown()
.. seealso:: :meth:`__aenter__` and :meth:`__aexit__`.
.. seealso:: :wiki:`Architecture Overview <Architecture>`,
:wiki:`Builder Pattern <Builder-Pattern>`
.. versionchanged:: 20.0
* Removed argument and attribute ``user_sig_handler``
* The only arguments and attributes are now :attr:`bot` and :attr:`update_queue` as now
the sole purpose of this class is to fetch updates. The entry point to a PTB application
is now :class:`telegram.ext.Application`.
Args:
bot (:class:`telegram.Bot`): The bot used with this Updater.
update_queue (:class:`asyncio.Queue`): Queue for the updates.
Attributes:
bot (:class:`telegram.Bot`): The bot used with this Updater.
update_queue (:class:`asyncio.Queue`): Queue for the updates.
"""
__slots__ = (
"__lock",
"__polling_cleanup_cb",
"__polling_task",
"__polling_task_stop_event",
"_httpd",
"_initialized",
"_last_update_id",
"_running",
"bot",
"update_queue",
)
def __init__(
self,
bot: "Bot",
update_queue: "asyncio.Queue[object]",
):
self.bot: Bot = bot
self.update_queue: asyncio.Queue[object] = update_queue
self._last_update_id = 0
self._running = False
self._initialized = False
self._httpd: Optional[WebhookServer] = None
self.__lock = asyncio.Lock()
self.__polling_task: Optional[asyncio.Task] = None
self.__polling_task_stop_event: asyncio.Event = asyncio.Event()
self.__polling_cleanup_cb: Optional[Callable[[], Coroutine[Any, Any, None]]] = None
async def __aenter__(self: _UpdaterType) -> _UpdaterType: # noqa: PYI019
"""
|async_context_manager| :meth:`initializes <initialize>` the Updater.
Returns:
The initialized Updater instance.
Raises:
:exc:`Exception`: If an exception is raised during initialization, :meth:`shutdown`
is called in this case.
"""
try:
await self.initialize()
return self
except Exception as exc:
await self.shutdown()
raise exc
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""|async_context_manager| :meth:`shuts down <shutdown>` the Updater."""
# Make sure not to return `True` so that exceptions are not suppressed
# https://docs.python.org/3/reference/datamodel.html?#object.__aexit__
await self.shutdown()
def __repr__(self) -> str:
"""Give a string representation of the updater in the form ``Updater[bot=...]``.
As this class doesn't implement :meth:`object.__str__`, the default implementation
will be used, which is equivalent to :meth:`__repr__`.
Returns:
:obj:`str`
"""
return build_repr_with_selected_attrs(self, bot=self.bot)
@property
def running(self) -> bool:
return self._running
async def initialize(self) -> None:
"""Initializes the Updater & the associated :attr:`bot` by calling
:meth:`telegram.Bot.initialize`.
.. seealso::
:meth:`shutdown`
"""
if self._initialized:
_LOGGER.debug("This Updater is already initialized.")
return
await self.bot.initialize()
self._initialized = True
async def shutdown(self) -> None:
"""
Shutdown the Updater & the associated :attr:`bot` by calling :meth:`telegram.Bot.shutdown`.
.. seealso::
:meth:`initialize`
Raises:
:exc:`RuntimeError`: If the updater is still running.
"""
if self.running:
raise RuntimeError("This Updater is still running!")
if not self._initialized:
_LOGGER.debug("This Updater is already shut down. Returning.")
return
await self.bot.shutdown()
self._initialized = False
_LOGGER.debug("Shut down of Updater complete")
async def start_polling(
self,
poll_interval: float = 0.0,
timeout: int = 10,
bootstrap_retries: int = -1,
read_timeout: ODVInput[float] = DEFAULT_NONE,
write_timeout: ODVInput[float] = DEFAULT_NONE,
connect_timeout: ODVInput[float] = DEFAULT_NONE,
pool_timeout: ODVInput[float] = DEFAULT_NONE,
allowed_updates: Optional[List[str]] = None,
drop_pending_updates: Optional[bool] = None,
error_callback: Optional[Callable[[TelegramError], None]] = None,
) -> "asyncio.Queue[object]":
"""Starts polling updates from Telegram.
.. versionchanged:: 20.0
Removed the ``clean`` argument in favor of :paramref:`drop_pending_updates`.
Args:
poll_interval (:obj:`float`, optional): Time to wait between polling updates from
Telegram in seconds. Default is ``0.0``.
timeout (:obj:`int`, optional): Passed to
:paramref:`telegram.Bot.get_updates.timeout`. Defaults to ``10`` seconds.
bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the
:class:`telegram.ext.Updater` will retry on failures on the Telegram server.
* < 0 - retry indefinitely (default)
* 0 - no retries
* > 0 - retry up to X times
read_timeout (:obj:`float`, optional): Value to pass to
:paramref:`telegram.Bot.get_updates.read_timeout`. Defaults to
:attr:`~telegram.request.BaseRequest.DEFAULT_NONE`.
.. versionchanged:: 20.7
Defaults to :attr:`~telegram.request.BaseRequest.DEFAULT_NONE` instead of
``2``.
.. deprecated:: 20.7
Deprecated in favor of setting the timeout via
:meth:`telegram.ext.ApplicationBuilder.get_updates_read_timeout` or
:paramref:`telegram.Bot.get_updates_request`.
write_timeout (:obj:`float` | :obj:`None`, optional): Value to pass to
:paramref:`telegram.Bot.get_updates.write_timeout`. Defaults to
:attr:`~telegram.request.BaseRequest.DEFAULT_NONE`.
.. deprecated:: 20.7
Deprecated in favor of setting the timeout via
:meth:`telegram.ext.ApplicationBuilder.get_updates_write_timeout` or
:paramref:`telegram.Bot.get_updates_request`.
connect_timeout (:obj:`float` | :obj:`None`, optional): Value to pass to
:paramref:`telegram.Bot.get_updates.connect_timeout`. Defaults to
:attr:`~telegram.request.BaseRequest.DEFAULT_NONE`.
.. deprecated:: 20.7
Deprecated in favor of setting the timeout via
:meth:`telegram.ext.ApplicationBuilder.get_updates_connect_timeout` or
:paramref:`telegram.Bot.get_updates_request`.
pool_timeout (:obj:`float` | :obj:`None`, optional): Value to pass to
:paramref:`telegram.Bot.get_updates.pool_timeout`. Defaults to
:attr:`~telegram.request.BaseRequest.DEFAULT_NONE`.
.. deprecated:: 20.7
Deprecated in favor of setting the timeout via
:meth:`telegram.ext.ApplicationBuilder.get_updates_pool_timeout` or
:paramref:`telegram.Bot.get_updates_request`.
allowed_updates (List[:obj:`str`], optional): Passed to
:meth:`telegram.Bot.get_updates`.
drop_pending_updates (:obj:`bool`, optional): Whether to clean any pending updates on
Telegram servers before actually starting to poll. Default is :obj:`False`.
.. versionadded :: 13.4
error_callback (Callable[[:exc:`telegram.error.TelegramError`], :obj:`None`], \
optional): Callback to handle :exc:`telegram.error.TelegramError` s that occur
while calling :meth:`telegram.Bot.get_updates` during polling. Defaults to
:obj:`None`, in which case errors will be logged. Callback signature::
def callback(error: telegram.error.TelegramError)
Note:
The :paramref:`error_callback` must *not* be a :term:`coroutine function`! If
asynchronous behavior of the callback is wanted, please schedule a task from
within the callback.
Returns:
:class:`asyncio.Queue`: The update queue that can be filled from the main thread.
Raises:
:exc:`RuntimeError`: If the updater is already running or was not initialized.
"""
# We refrain from issuing deprecation warnings for the timeout parameters here, as we
# already issue them in `Application`. This means that there are no warnings when using
# `Updater` without `Application`, but this is a rather special use case.
if error_callback and asyncio.iscoroutinefunction(error_callback):
raise TypeError(
"The `error_callback` must not be a coroutine function! Use an ordinary function "
"instead. "
)
async with self.__lock:
if self.running:
raise RuntimeError("This Updater is already running!")
if not self._initialized:
raise RuntimeError("This Updater was not initialized via `Updater.initialize`!")
self._running = True
try:
# Create & start tasks
polling_ready = asyncio.Event()
await self._start_polling(
poll_interval=poll_interval,
timeout=timeout,
read_timeout=read_timeout,
write_timeout=write_timeout,
connect_timeout=connect_timeout,
pool_timeout=pool_timeout,
bootstrap_retries=bootstrap_retries,
drop_pending_updates=drop_pending_updates,
allowed_updates=allowed_updates,
ready=polling_ready,
error_callback=error_callback,
)
_LOGGER.debug("Waiting for polling to start")
await polling_ready.wait()
_LOGGER.debug("Polling updates from Telegram started")
return self.update_queue
except Exception as exc:
self._running = False
raise exc
async def _start_polling(
self,
poll_interval: float,
timeout: int,
read_timeout: ODVInput[float],
write_timeout: ODVInput[float],
connect_timeout: ODVInput[float],
pool_timeout: ODVInput[float],
bootstrap_retries: int,
drop_pending_updates: Optional[bool],
allowed_updates: Optional[List[str]],
ready: asyncio.Event,
error_callback: Optional[Callable[[TelegramError], None]],
) -> None:
_LOGGER.debug("Updater started (polling)")
# the bootstrapping phase does two things:
# 1) make sure there is no webhook set
# 2) apply drop_pending_updates
await self._bootstrap(
bootstrap_retries,
drop_pending_updates=drop_pending_updates,
webhook_url="",
allowed_updates=None,
)
_LOGGER.debug("Bootstrap done")
async def polling_action_cb() -> bool:
try:
updates = await self.bot.get_updates(
offset=self._last_update_id,
timeout=timeout,
read_timeout=read_timeout,
connect_timeout=connect_timeout,
write_timeout=write_timeout,
pool_timeout=pool_timeout,
allowed_updates=allowed_updates,
)
except TelegramError as exc:
# TelegramErrors should be processed by the network retry loop
raise exc
except Exception as exc:
# Other exceptions should not. Let's log them for now.
_LOGGER.critical(
"Something went wrong processing the data received from Telegram. "
"Received data was *not* processed!",
exc_info=exc,
)
return True
if updates:
if not self.running:
_LOGGER.critical(
"Updater stopped unexpectedly. Pulled updates will be ignored and pulled "
"again on restart."
)
else:
for update in updates:
await self.update_queue.put(update)
self._last_update_id = updates[-1].update_id + 1 # Add one to 'confirm' it
return True # Keep fetching updates & don't quit. Polls with poll_interval.
def default_error_callback(exc: TelegramError) -> None:
_LOGGER.exception("Exception happened while polling for updates.", exc_info=exc)
# Start task that runs in background, pulls
# updates from Telegram and inserts them in the update queue of the
# Application.
self.__polling_task = asyncio.create_task(
self._network_loop_retry(
action_cb=polling_action_cb,
on_err_cb=error_callback or default_error_callback,
description="getting Updates",
interval=poll_interval,
stop_event=self.__polling_task_stop_event,
),
name="Updater:start_polling:polling_task",
)
# Prepare a cleanup callback to await on _stop_polling
# Calling get_updates one more time with the latest `offset` parameter ensures that
# all updates that where put into the update queue are also marked as "read" to TG,
# so we do not receive them again on the next startup
# We define this here so that we can use the same parameters as in the polling task
async def _get_updates_cleanup() -> None:
_LOGGER.debug(
"Calling `get_updates` one more time to mark all fetched updates as read."
)
try:
await self.bot.get_updates(
offset=self._last_update_id,
# We don't want to do long polling here!
timeout=0,
read_timeout=read_timeout,
connect_timeout=connect_timeout,
write_timeout=write_timeout,
pool_timeout=pool_timeout,
allowed_updates=allowed_updates,
)
except TelegramError as exc:
_LOGGER.error(
"Error while calling `get_updates` one more time to mark all fetched updates "
"as read: %s. Suppressing error to ensure graceful shutdown. When polling for "
"updates is restarted, updates may be fetched again. Please adjust timeouts "
"via `ApplicationBuilder` or the parameter `get_updates_request` of `Bot`.",
exc_info=exc,
)
self.__polling_cleanup_cb = _get_updates_cleanup
if ready is not None:
ready.set()
async def start_webhook(
self,
listen: DVType[str] = DEFAULT_IP,
port: DVType[int] = DEFAULT_80,
url_path: str = "",
cert: Optional[Union[str, Path]] = None,
key: Optional[Union[str, Path]] = None,
bootstrap_retries: int = 0,
webhook_url: Optional[str] = None,
allowed_updates: Optional[List[str]] = None,
drop_pending_updates: Optional[bool] = None,
ip_address: Optional[str] = None,
max_connections: int = 40,
secret_token: Optional[str] = None,
unix: Optional[Union[str, Path, "socket"]] = None,
) -> "asyncio.Queue[object]":
"""
Starts a small http server to listen for updates via webhook. If :paramref:`cert`
and :paramref:`key` are not provided, the webhook will be started directly on
``http://listen:port/url_path``, so SSL can be handled by another
application. Else, the webhook will be started on
``https://listen:port/url_path``. Also calls :meth:`telegram.Bot.set_webhook` as required.
Important:
If you want to use this method, you must install PTB with the optional requirement
``webhooks``, i.e.
.. code-block:: bash
pip install "python-telegram-bot[webhooks]"
.. seealso:: :wiki:`Webhooks`
.. versionchanged:: 13.4
:meth:`start_webhook` now *always* calls :meth:`telegram.Bot.set_webhook`, so pass
``webhook_url`` instead of calling ``updater.bot.set_webhook(webhook_url)`` manually.
.. versionchanged:: 20.0
* Removed the ``clean`` argument in favor of :paramref:`drop_pending_updates` and
removed the deprecated argument ``force_event_loop``.
Args:
listen (:obj:`str`, optional): IP-Address to listen on. Defaults to
`127.0.0.1 <https://en.wikipedia.org/wiki/Localhost>`_.
port (:obj:`int`, optional): Port the bot should be listening on. Must be one of
:attr:`telegram.constants.SUPPORTED_WEBHOOK_PORTS` unless the bot is running
behind a proxy. Defaults to ``80``.
url_path (:obj:`str`, optional): Path inside url (http(s)://listen:port/<url_path>).
Defaults to ``''``.
cert (:class:`pathlib.Path` | :obj:`str`, optional): Path to the SSL certificate file.
key (:class:`pathlib.Path` | :obj:`str`, optional): Path to the SSL key file.
drop_pending_updates (:obj:`bool`, optional): Whether to clean any pending updates on
Telegram servers before actually starting to poll. Default is :obj:`False`.
.. versionadded :: 13.4
bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the
:class:`telegram.ext.Updater` will retry on failures on the Telegram server.
* < 0 - retry indefinitely
* 0 - no retries (default)
* > 0 - retry up to X times
webhook_url (:obj:`str`, optional): Explicitly specify the webhook url. Useful behind
NAT, reverse proxy, etc. Default is derived from :paramref:`listen`,
:paramref:`port`, :paramref:`url_path`, :paramref:`cert`, and :paramref:`key`.
ip_address (:obj:`str`, optional): Passed to :meth:`telegram.Bot.set_webhook`.
Defaults to :obj:`None`.
.. versionadded :: 13.4
allowed_updates (List[:obj:`str`], optional): Passed to
:meth:`telegram.Bot.set_webhook`. Defaults to :obj:`None`.
max_connections (:obj:`int`, optional): Passed to
:meth:`telegram.Bot.set_webhook`. Defaults to ``40``.
.. versionadded:: 13.6
secret_token (:obj:`str`, optional): Passed to :meth:`telegram.Bot.set_webhook`.
Defaults to :obj:`None`.
When added, the web server started by this call will expect the token to be set in
the ``X-Telegram-Bot-Api-Secret-Token`` header of an incoming request and will
raise a :class:`http.HTTPStatus.FORBIDDEN <http.HTTPStatus>` error if either the
header isn't set or it is set to a wrong token.
.. versionadded:: 20.0
unix (:class:`pathlib.Path` | :obj:`str` | :class:`socket.socket`, optional): Can be
either:
* the path to the unix socket file as :class:`pathlib.Path` or :obj:`str`. This
will be passed to `tornado.netutil.bind_unix_socket <https://www.tornadoweb.org/
en/stable/netutil.html#tornado.netutil.bind_unix_socket>`_ to create the socket.
If the Path does not exist, the file will be created.
* or the socket itself. This option allows you to e.g. restrict the permissions of
the socket for improved security. Note that you need to pass the correct family,
type and socket options yourself.
Caution:
This parameter is a replacement for the default TCP bind. Therefore, it is
mutually exclusive with :paramref:`listen` and :paramref:`port`. When using
this param, you must also run a reverse proxy to the unix socket and set the
appropriate :paramref:`webhook_url`.
.. versionadded:: 20.8
.. versionchanged:: 21.1
Added support to pass a socket instance itself.
Returns:
:class:`queue.Queue`: The update queue that can be filled from the main thread.
Raises:
:exc:`RuntimeError`: If the updater is already running or was not initialized.
"""
if not WEBHOOKS_AVAILABLE:
raise RuntimeError(
"To use `start_webhook`, PTB must be installed via `pip install "
'"python-telegram-bot[webhooks]"`.'
)
# unix has special requirements what must and mustn't be set when using it
if unix:
error_msg = (
"You can not pass unix and {0}, only use one. Unix if you want to "
"initialize a unix socket, or {0} for a standard TCP server."
)
if not isinstance(listen, DefaultValue):
raise RuntimeError(error_msg.format("listen"))
if not isinstance(port, DefaultValue):
raise RuntimeError(error_msg.format("port"))
if not webhook_url:
raise RuntimeError(
"Since you set unix, you also need to set the URL to the webhook "
"of the proxy you run in front of the unix socket."
)
async with self.__lock:
if self.running:
raise RuntimeError("This Updater is already running!")
if not self._initialized:
raise RuntimeError("This Updater was not initialized via `Updater.initialize`!")
self._running = True
try:
# Create & start tasks
webhook_ready = asyncio.Event()
await self._start_webhook(
listen=DefaultValue.get_value(listen),
port=DefaultValue.get_value(port),
url_path=url_path,
cert=cert,
key=key,
bootstrap_retries=bootstrap_retries,
drop_pending_updates=drop_pending_updates,
webhook_url=webhook_url,
allowed_updates=allowed_updates,
ready=webhook_ready,
ip_address=ip_address,
max_connections=max_connections,
secret_token=secret_token,
unix=unix,
)
_LOGGER.debug("Waiting for webhook server to start")
await webhook_ready.wait()
_LOGGER.debug("Webhook server started")
except Exception as exc:
self._running = False
raise exc
# Return the update queue so the main thread can insert updates
return self.update_queue
async def _start_webhook(
self,
listen: str,
port: int,
url_path: str,
bootstrap_retries: int,
allowed_updates: Optional[List[str]],
cert: Optional[Union[str, Path]] = None,
key: Optional[Union[str, Path]] = None,
drop_pending_updates: Optional[bool] = None,
webhook_url: Optional[str] = None,
ready: Optional[asyncio.Event] = None,
ip_address: Optional[str] = None,
max_connections: int = 40,
secret_token: Optional[str] = None,
unix: Optional[Union[str, Path, "socket"]] = None,
) -> None:
_LOGGER.debug("Updater thread started (webhook)")
if not url_path.startswith("/"):
url_path = f"/{url_path}"
# Create Tornado app instance
app = WebhookAppClass(url_path, self.bot, self.update_queue, secret_token)
# Form SSL Context
# An SSLError is raised if the private key does not match with the certificate
# Note that we only use the SSL certificate for the WebhookServer, if the key is also
# present. This is because the WebhookServer may not actually be in charge of performing
# the SSL handshake, e.g. in case a reverse proxy is used
if cert is not None and key is not None:
try:
ssl_ctx: Optional[ssl.SSLContext] = ssl.create_default_context(
ssl.Purpose.CLIENT_AUTH
)
ssl_ctx.load_cert_chain(cert, key) # type: ignore[union-attr]
except ssl.SSLError as exc:
raise TelegramError("Invalid SSL Certificate") from exc
else:
ssl_ctx = None
# Create and start server
self._httpd = WebhookServer(listen, port, app, ssl_ctx, unix)
if not webhook_url:
webhook_url = self._gen_webhook_url(
protocol="https" if ssl_ctx else "http",
listen=DefaultValue.get_value(listen),
port=port,
url_path=url_path,
)
# We pass along the cert to the webhook if present.
await self._bootstrap(
# Passing a Path or string only works if the bot is running against a local bot API
# server, so let's read the contents
cert=Path(cert).read_bytes() if cert else None,
max_retries=bootstrap_retries,
drop_pending_updates=drop_pending_updates,
webhook_url=webhook_url,
allowed_updates=allowed_updates,
ip_address=ip_address,
max_connections=max_connections,
secret_token=secret_token,
)
await self._httpd.serve_forever(ready=ready)
@staticmethod
def _gen_webhook_url(protocol: str, listen: str, port: int, url_path: str) -> str:
# TODO: double check if this should be https in any case - the docs of start_webhook
# say differently!
return f"{protocol}://{listen}:{port}{url_path}"
async def _network_loop_retry(
self,
action_cb: Callable[..., Coroutine],
on_err_cb: Callable[[TelegramError], None],
description: str,
interval: float,
stop_event: Optional[asyncio.Event],
) -> None:
"""Perform a loop calling `action_cb`, retrying after network errors.
Stop condition for loop: `self.running` evaluates :obj:`False` or return value of
`action_cb` evaluates :obj:`False`.
Args:
action_cb (:term:`coroutine function`): Network oriented callback function to call.
on_err_cb (:obj:`callable`): Callback to call when TelegramError is caught. Receives
the exception object as a parameter.
description (:obj:`str`): Description text to use for logs and exception raised.
interval (:obj:`float` | :obj:`int`): Interval to sleep between each call to
`action_cb`.
stop_event (:class:`asyncio.Event` | :obj:`None`): Event to wait on for stopping the
loop. Setting the event will make the loop exit even if `action_cb` is currently
running.
"""
async def do_action() -> bool:
if not stop_event:
return await action_cb()
action_cb_task = asyncio.create_task(action_cb())
stop_task = asyncio.create_task(stop_event.wait())
done, pending = await asyncio.wait(
(action_cb_task, stop_task), return_when=asyncio.FIRST_COMPLETED
)
with contextlib.suppress(asyncio.CancelledError):
for task in pending:
task.cancel()
if stop_task in done:
_LOGGER.debug("Network loop retry %s was cancelled", description)
return False
return action_cb_task.result()
_LOGGER.debug("Start network loop retry %s", description)
cur_interval = interval
while self.running:
try:
if not await do_action():
break
except RetryAfter as exc:
_LOGGER.info("%s", exc)
cur_interval = 0.5 + exc.retry_after
except TimedOut as toe:
_LOGGER.debug("Timed out %s: %s", description, toe)
# If failure is due to timeout, we should retry asap.
cur_interval = 0
except InvalidToken as pex:
_LOGGER.error("Invalid token; aborting")
raise pex
except TelegramError as telegram_exc:
_LOGGER.error("Error while %s: %s", description, telegram_exc)
on_err_cb(telegram_exc)
# increase waiting times on subsequent errors up to 30secs
cur_interval = 1 if cur_interval == 0 else min(30, 1.5 * cur_interval)
else:
cur_interval = interval
if cur_interval:
await asyncio.sleep(cur_interval)
async def _bootstrap(
self,
max_retries: int,
webhook_url: Optional[str],
allowed_updates: Optional[List[str]],
drop_pending_updates: Optional[bool] = None,
cert: Optional[bytes] = None,
bootstrap_interval: float = 1,
ip_address: Optional[str] = None,
max_connections: int = 40,
secret_token: Optional[str] = None,
) -> None:
"""Prepares the setup for fetching updates: delete or set the webhook and drop pending
updates if appropriate. If there are unsuccessful attempts, this will retry as specified by
:paramref:`max_retries`.
"""
retries = 0
async def bootstrap_del_webhook() -> bool:
_LOGGER.debug("Deleting webhook")
if drop_pending_updates:
_LOGGER.debug("Dropping pending updates from Telegram server")
await self.bot.delete_webhook(drop_pending_updates=drop_pending_updates)
return False
async def bootstrap_set_webhook() -> bool:
_LOGGER.debug("Setting webhook")
if drop_pending_updates:
_LOGGER.debug("Dropping pending updates from Telegram server")
await self.bot.set_webhook(
url=webhook_url, # type: ignore[arg-type]
certificate=cert,
allowed_updates=allowed_updates,
ip_address=ip_address,
drop_pending_updates=drop_pending_updates,
max_connections=max_connections,
secret_token=secret_token,
)
return False
def bootstrap_on_err_cb(exc: Exception) -> None:
# We need this since retries is an immutable object otherwise and the changes
# wouldn't propagate outside of thi function
nonlocal retries
if not isinstance(exc, InvalidToken) and (max_retries < 0 or retries < max_retries):
retries += 1
_LOGGER.warning(
"Failed bootstrap phase; try=%s max_retries=%s", retries, max_retries
)
else:
_LOGGER.error("Failed bootstrap phase after %s retries (%s)", retries, exc)
raise exc
# Dropping pending updates from TG can be efficiently done with the drop_pending_updates
# parameter of delete/start_webhook, even in the case of polling. Also, we want to make
# sure that no webhook is configured in case of polling, so we just always call
# delete_webhook for polling
if drop_pending_updates or not webhook_url:
await self._network_loop_retry(
bootstrap_del_webhook,
bootstrap_on_err_cb,
"bootstrap del webhook",
bootstrap_interval,
stop_event=None,
)
# Reset the retries counter for the next _network_loop_retry call
retries = 0
# Restore/set webhook settings, if needed. Again, we don't know ahead if a webhook is set,
# so we set it anyhow.
if webhook_url:
await self._network_loop_retry(
bootstrap_set_webhook,
bootstrap_on_err_cb,
"bootstrap set webhook",
bootstrap_interval,
stop_event=None,
)
async def stop(self) -> None:
"""Stops the polling/webhook.
.. seealso::
:meth:`start_polling`, :meth:`start_webhook`
Raises:
:exc:`RuntimeError`: If the updater is not running.
"""
async with self.__lock:
if not self.running:
raise RuntimeError("This Updater is not running!")
_LOGGER.debug("Stopping Updater")
self._running = False
await self._stop_httpd()
await self._stop_polling()
_LOGGER.debug("Updater.stop() is complete")
async def _stop_httpd(self) -> None:
"""Stops the Webhook server by calling ``WebhookServer.shutdown()``"""
if self._httpd:
_LOGGER.debug("Waiting for current webhook connection to be closed.")
await self._httpd.shutdown()
self._httpd = None
async def _stop_polling(self) -> None:
"""Stops the polling task by awaiting it."""
if self.__polling_task:
_LOGGER.debug("Waiting background polling task to finish up.")
self.__polling_task_stop_event.set()
with contextlib.suppress(asyncio.CancelledError):
await self.__polling_task
# It only fails in rare edge-cases, e.g. when `stop()` is called directly
# after start_polling(), but lets better be safe than sorry ...
self.__polling_task = None
self.__polling_task_stop_event.clear()
if self.__polling_cleanup_cb:
await self.__polling_cleanup_cb()
self.__polling_cleanup_cb = None
else:
_LOGGER.warning(
"No polling cleanup callback defined. The last fetched updates may be "
"fetched again on the next polling start."
)