python-telegram-bot/telegram/ext/_updater.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

871 lines
35 KiB
Python
Raw Normal View History

#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
2023-01-01 21:31:29 +01:00
# Copyright (C) 2015-2023
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
2017-09-01 08:43:08 +02:00
"""This module contains the class Updater, which tries to make creating Telegram bots intuitive."""
import asyncio
2023-03-25 19:18:04 +01:00
import contextlib
2015-11-16 13:05:57 +01:00
import ssl
from pathlib import Path
from types import TracebackType
2023-02-02 18:55:07 +01:00
from typing import (
TYPE_CHECKING,
Any,
2023-02-02 18:55:07 +01:00
AsyncContextManager,
Callable,
Coroutine,
List,
Optional,
Type,
TypeVar,
Union,
)
from telegram._utils.defaultvalue import DEFAULT_80, DEFAULT_IP, DEFAULT_NONE, DefaultValue
2023-04-10 17:01:35 +02:00
from telegram._utils.logging import get_logger
from telegram._utils.repr import build_repr_with_selected_attrs
from telegram._utils.types import DVType, ODVInput
from telegram.error import InvalidToken, RetryAfter, TelegramError, TimedOut
try:
from telegram.ext._utils.webhookhandler import WebhookAppClass, WebhookServer
WEBHOOKS_AVAILABLE = True
except ImportError:
WEBHOOKS_AVAILABLE = False
2020-10-06 19:28:40 +02:00
if TYPE_CHECKING:
from telegram import Bot
2020-10-06 19:28:40 +02:00
2015-11-06 00:24:01 +01:00
_UpdaterType = TypeVar("_UpdaterType", bound="Updater") # pylint: disable=invalid-name
2023-04-10 17:01:35 +02:00
_LOGGER = get_logger(__name__)
2023-02-02 18:55:07 +01:00
class Updater(AsyncContextManager["Updater"]):
"""This class fetches updates for the bot either via long polling or by starting a webhook
server. Received updates are enqueued into the :attr:`update_queue` and may be fetched from
there to handle them appropriately.
Instances of this class can be used as asyncio context managers, where
.. code:: python
async with updater:
# code
is roughly equivalent to
.. code:: python
try:
await updater.initialize()
# code
finally:
await updater.shutdown()
2017-09-01 08:43:08 +02:00
.. seealso:: :meth:`__aenter__` and :meth:`__aexit__`.
.. seealso:: :wiki:`Architecture Overview <Architecture>`,
:wiki:`Builder Pattern <Builder-Pattern>`
.. versionchanged:: 20.0
* Removed argument and attribute ``user_sig_handler``
* The only arguments and attributes are now :attr:`bot` and :attr:`update_queue` as now
the sole purpose of this class is to fetch updates. The entry point to a PTB application
is now :class:`telegram.ext.Application`.
Args:
bot (:class:`telegram.Bot`): The bot used with this Updater.
update_queue (:class:`asyncio.Queue`): Queue for the updates.
Attributes:
bot (:class:`telegram.Bot`): The bot used with this Updater.
update_queue (:class:`asyncio.Queue`): Queue for the updates.
"""
__slots__ = (
"bot",
"update_queue",
"_last_update_id",
"_running",
"_initialized",
"_httpd",
"__lock",
"__polling_task",
)
2015-11-17 15:57:22 +01:00
def __init__(
self,
bot: "Bot",
2023-02-02 18:55:07 +01:00
update_queue: "asyncio.Queue[object]",
2020-10-06 19:28:40 +02:00
):
2023-02-02 18:55:07 +01:00
self.bot: Bot = bot
self.update_queue: asyncio.Queue[object] = update_queue
self._last_update_id = 0
self._running = False
self._initialized = False
self._httpd: Optional[WebhookServer] = None
self.__lock = asyncio.Lock()
self.__polling_task: Optional[asyncio.Task] = None
self.__polling_cleanup_cb: Optional[Callable[[], Coroutine[Any, Any, None]]] = None
async def __aenter__(self: _UpdaterType) -> _UpdaterType: # noqa: PYI019
"""
|async_context_manager| :meth:`initializes <initialize>` the Updater.
Returns:
The initialized Updater instance.
Raises:
:exc:`Exception`: If an exception is raised during initialization, :meth:`shutdown`
is called in this case.
"""
try:
await self.initialize()
return self
except Exception as exc:
await self.shutdown()
raise exc
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
"""|async_context_manager| :meth:`shuts down <shutdown>` the Updater."""
# Make sure not to return `True` so that exceptions are not suppressed
# https://docs.python.org/3/reference/datamodel.html?#object.__aexit__
await self.shutdown()
def __repr__(self) -> str:
"""Give a string representation of the updater in the form ``Updater[bot=...]``.
As this class doesn't implement :meth:`object.__str__`, the default implementation
will be used, which is equivalent to :meth:`__repr__`.
Returns:
:obj:`str`
"""
return build_repr_with_selected_attrs(self, bot=self.bot)
@property
def running(self) -> bool:
return self._running
async def initialize(self) -> None:
"""Initializes the Updater & the associated :attr:`bot` by calling
:meth:`telegram.Bot.initialize`.
.. seealso::
:meth:`shutdown`
"""
if self._initialized:
2023-04-10 17:01:35 +02:00
_LOGGER.debug("This Updater is already initialized.")
return
await self.bot.initialize()
self._initialized = True
async def shutdown(self) -> None:
"""
Shutdown the Updater & the associated :attr:`bot` by calling :meth:`telegram.Bot.shutdown`.
.. seealso::
:meth:`initialize`
Raises:
:exc:`RuntimeError`: If the updater is still running.
"""
if self.running:
raise RuntimeError("This Updater is still running!")
if not self._initialized:
2023-04-10 17:01:35 +02:00
_LOGGER.debug("This Updater is already shut down. Returning.")
return
await self.bot.shutdown()
self._initialized = False
2023-04-10 17:01:35 +02:00
_LOGGER.debug("Shut down of Updater complete")
async def start_polling(
2016-05-15 03:46:40 +02:00
self,
2020-10-06 19:28:40 +02:00
poll_interval: float = 0.0,
timeout: int = 10,
2020-10-06 19:28:40 +02:00
bootstrap_retries: int = -1,
read_timeout: ODVInput[float] = DEFAULT_NONE,
write_timeout: ODVInput[float] = DEFAULT_NONE,
connect_timeout: ODVInput[float] = DEFAULT_NONE,
pool_timeout: ODVInput[float] = DEFAULT_NONE,
allowed_updates: Optional[List[str]] = None,
drop_pending_updates: Optional[bool] = None,
error_callback: Optional[Callable[[TelegramError], None]] = None,
2023-02-02 18:55:07 +01:00
) -> "asyncio.Queue[object]":
2017-09-01 08:43:08 +02:00
"""Starts polling updates from Telegram.
2015-11-17 15:57:22 +01:00
.. versionchanged:: 20.0
Removed the ``clean`` argument in favor of :paramref:`drop_pending_updates`.
Args:
poll_interval (:obj:`float`, optional): Time to wait between polling updates from
Telegram in seconds. Default is ``0.0``.
timeout (:obj:`int`, optional): Passed to
:paramref:`telegram.Bot.get_updates.timeout`. Defaults to ``10`` seconds.
bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the
:class:`telegram.ext.Updater` will retry on failures on the Telegram server.
* < 0 - retry indefinitely (default)
* 0 - no retries
* > 0 - retry up to X times
read_timeout (:obj:`float`, optional): Value to pass to
:paramref:`telegram.Bot.get_updates.read_timeout`. Defaults to
:attr:`~telegram.request.BaseRequest.DEFAULT_NONE`.
2023-11-27 19:01:57 +01:00
.. versionchanged:: 20.7
Defaults to :attr:`~telegram.request.BaseRequest.DEFAULT_NONE` instead of
``2``.
2023-11-27 19:01:57 +01:00
.. deprecated:: 20.7
Deprecated in favor of setting the timeout via
:meth:`telegram.ext.ApplicationBuilder.get_updates_read_timeout` or
:paramref:`telegram.Bot.get_updates_request`.
write_timeout (:obj:`float` | :obj:`None`, optional): Value to pass to
:paramref:`telegram.Bot.get_updates.write_timeout`. Defaults to
:attr:`~telegram.request.BaseRequest.DEFAULT_NONE`.
2023-11-27 19:01:57 +01:00
.. deprecated:: 20.7
Deprecated in favor of setting the timeout via
:meth:`telegram.ext.ApplicationBuilder.get_updates_write_timeout` or
:paramref:`telegram.Bot.get_updates_request`.
connect_timeout (:obj:`float` | :obj:`None`, optional): Value to pass to
:paramref:`telegram.Bot.get_updates.connect_timeout`. Defaults to
:attr:`~telegram.request.BaseRequest.DEFAULT_NONE`.
2023-11-27 19:01:57 +01:00
.. deprecated:: 20.7
Deprecated in favor of setting the timeout via
:meth:`telegram.ext.ApplicationBuilder.get_updates_connect_timeout` or
:paramref:`telegram.Bot.get_updates_request`.
pool_timeout (:obj:`float` | :obj:`None`, optional): Value to pass to
:paramref:`telegram.Bot.get_updates.pool_timeout`. Defaults to
:attr:`~telegram.request.BaseRequest.DEFAULT_NONE`.
2023-11-27 19:01:57 +01:00
.. deprecated:: 20.7
Deprecated in favor of setting the timeout via
:meth:`telegram.ext.ApplicationBuilder.get_updates_pool_timeout` or
:paramref:`telegram.Bot.get_updates_request`.
allowed_updates (List[:obj:`str`], optional): Passed to
:meth:`telegram.Bot.get_updates`.
drop_pending_updates (:obj:`bool`, optional): Whether to clean any pending updates on
Telegram servers before actually starting to poll. Default is :obj:`False`.
.. versionadded :: 13.4
error_callback (Callable[[:exc:`telegram.error.TelegramError`], :obj:`None`], \
optional): Callback to handle :exc:`telegram.error.TelegramError` s that occur
while calling :meth:`telegram.Bot.get_updates` during polling. Defaults to
:obj:`None`, in which case errors will be logged. Callback signature::
def callback(error: telegram.error.TelegramError)
Note:
The :paramref:`error_callback` must *not* be a :term:`coroutine function`! If
asynchronous behavior of the callback is wanted, please schedule a task from
within the callback.
Returns:
:class:`asyncio.Queue`: The update queue that can be filled from the main thread.
Raises:
:exc:`RuntimeError`: If the updater is already running or was not initialized.
2017-09-01 08:43:08 +02:00
"""
# We refrain from issuing deprecation warnings for the timeout parameters here, as we
# already issue them in `Application`. This means that there are no warnings when using
# `Updater` without `Application`, but this is a rather special use case.
if error_callback and asyncio.iscoroutinefunction(error_callback):
raise TypeError(
"The `error_callback` must not be a coroutine function! Use an ordinary function "
"instead. "
)
async with self.__lock:
if self.running:
raise RuntimeError("This Updater is already running!")
if not self._initialized:
raise RuntimeError("This Updater was not initialized via `Updater.initialize`!")
self._running = True
try:
# Create & start tasks
polling_ready = asyncio.Event()
await self._start_polling(
poll_interval=poll_interval,
timeout=timeout,
read_timeout=read_timeout,
write_timeout=write_timeout,
connect_timeout=connect_timeout,
pool_timeout=pool_timeout,
bootstrap_retries=bootstrap_retries,
drop_pending_updates=drop_pending_updates,
allowed_updates=allowed_updates,
ready=polling_ready,
error_callback=error_callback,
)
2015-11-17 15:57:22 +01:00
2023-04-10 17:01:35 +02:00
_LOGGER.debug("Waiting for polling to start")
await polling_ready.wait()
2023-04-10 17:01:35 +02:00
_LOGGER.debug("Polling updates from Telegram started")
return self.update_queue
except Exception as exc:
self._running = False
raise exc
async def _start_polling(
self,
poll_interval: float,
timeout: int,
read_timeout: ODVInput[float],
write_timeout: ODVInput[float],
connect_timeout: ODVInput[float],
pool_timeout: ODVInput[float],
bootstrap_retries: int,
drop_pending_updates: Optional[bool],
allowed_updates: Optional[List[str]],
ready: asyncio.Event,
error_callback: Optional[Callable[[TelegramError], None]],
) -> None:
2023-04-10 17:01:35 +02:00
_LOGGER.debug("Updater started (polling)")
# the bootstrapping phase does two things:
# 1) make sure there is no webhook set
# 2) apply drop_pending_updates
await self._bootstrap(
bootstrap_retries,
drop_pending_updates=drop_pending_updates,
webhook_url="",
allowed_updates=None,
)
2023-04-10 17:01:35 +02:00
_LOGGER.debug("Bootstrap done")
async def polling_action_cb() -> bool:
try:
updates = await self.bot.get_updates(
offset=self._last_update_id,
timeout=timeout,
read_timeout=read_timeout,
connect_timeout=connect_timeout,
write_timeout=write_timeout,
pool_timeout=pool_timeout,
allowed_updates=allowed_updates,
)
except TelegramError as exc:
# TelegramErrors should be processed by the network retry loop
raise exc
except Exception as exc:
# Other exceptions should not. Let's log them for now.
2023-04-10 17:01:35 +02:00
_LOGGER.critical(
"Something went wrong processing the data received from Telegram. "
"Received data was *not* processed!",
exc_info=exc,
)
return True
if updates:
if not self.running:
2023-04-10 17:01:35 +02:00
_LOGGER.critical(
"Updater stopped unexpectedly. Pulled updates will be ignored and pulled "
"again on restart."
)
else:
for update in updates:
await self.update_queue.put(update)
self._last_update_id = updates[-1].update_id + 1 # Add one to 'confirm' it
return True # Keep fetching updates & don't quit. Polls with poll_interval.
def default_error_callback(exc: TelegramError) -> None:
2023-04-10 17:01:35 +02:00
_LOGGER.exception("Exception happened while polling for updates.", exc_info=exc)
# Start task that runs in background, pulls
# updates from Telegram and inserts them in the update queue of the
# Application.
self.__polling_task = asyncio.create_task(
self._network_loop_retry(
action_cb=polling_action_cb,
on_err_cb=error_callback or default_error_callback,
description="getting Updates",
interval=poll_interval,
),
name="Updater:start_polling:polling_task",
)
# Prepare a cleanup callback to await on _stop_polling
# Calling get_updates one more time with the latest `offset` parameter ensures that
# all updates that where put into the update queue are also marked as "read" to TG,
# so we do not receive them again on the next startup
# We define this here so that we can use the same parameters as in the polling task
async def _get_updates_cleanup() -> None:
_LOGGER.debug(
"Calling `get_updates` one more time to mark all fetched updates as read."
)
try:
await self.bot.get_updates(
offset=self._last_update_id,
# We don't want to do long polling here!
timeout=0,
read_timeout=read_timeout,
connect_timeout=connect_timeout,
write_timeout=write_timeout,
pool_timeout=pool_timeout,
allowed_updates=allowed_updates,
)
except TelegramError as exc:
_LOGGER.error(
"Error while calling `get_updates` one more time to mark all fetched updates "
"as read: %s. Suppressing error to ensure graceful shutdown. When polling for "
"updates is restarted, updates may be fetched again. Please adjust timeouts "
"via `ApplicationBuilder` or the parameter `get_updates_request` of `Bot`.",
exc_info=exc,
)
self.__polling_cleanup_cb = _get_updates_cleanup
if ready is not None:
ready.set()
async def start_webhook(
self,
listen: DVType[str] = DEFAULT_IP,
port: DVType[int] = DEFAULT_80,
2020-10-06 19:28:40 +02:00
url_path: str = "",
cert: Optional[Union[str, Path]] = None,
key: Optional[Union[str, Path]] = None,
2020-10-06 19:28:40 +02:00
bootstrap_retries: int = 0,
webhook_url: Optional[str] = None,
allowed_updates: Optional[List[str]] = None,
drop_pending_updates: Optional[bool] = None,
ip_address: Optional[str] = None,
max_connections: int = 40,
secret_token: Optional[str] = None,
unix: Optional[Union[str, Path]] = None,
2023-02-02 18:55:07 +01:00
) -> "asyncio.Queue[object]":
2015-11-16 13:05:57 +01:00
"""
Starts a small http server to listen for updates via webhook. If :paramref:`cert`
and :paramref:`key` are not provided, the webhook will be started directly on
``http://listen:port/url_path``, so SSL can be handled by another
application. Else, the webhook will be started on
``https://listen:port/url_path``. Also calls :meth:`telegram.Bot.set_webhook` as required.
2015-11-16 13:05:57 +01:00
Important:
If you want to use this method, you must install PTB with the optional requirement
``webhooks``, i.e.
.. code-block:: bash
pip install "python-telegram-bot[webhooks]"
.. seealso:: :wiki:`Webhooks`
.. versionchanged:: 13.4
:meth:`start_webhook` now *always* calls :meth:`telegram.Bot.set_webhook`, so pass
``webhook_url`` instead of calling ``updater.bot.set_webhook(webhook_url)`` manually.
.. versionchanged:: 20.0
* Removed the ``clean`` argument in favor of :paramref:`drop_pending_updates` and
removed the deprecated argument ``force_event_loop``.
2015-11-16 13:05:57 +01:00
Args:
listen (:obj:`str`, optional): IP-Address to listen on. Defaults to
`127.0.0.1 <https://en.wikipedia.org/wiki/Localhost>`_.
port (:obj:`int`, optional): Port the bot should be listening on. Must be one of
:attr:`telegram.constants.SUPPORTED_WEBHOOK_PORTS` unless the bot is running
behind a proxy. Defaults to ``80``.
url_path (:obj:`str`, optional): Path inside url (http(s)://listen:port/<url_path>).
Defaults to ``''``.
cert (:class:`pathlib.Path` | :obj:`str`, optional): Path to the SSL certificate file.
key (:class:`pathlib.Path` | :obj:`str`, optional): Path to the SSL key file.
drop_pending_updates (:obj:`bool`, optional): Whether to clean any pending updates on
Telegram servers before actually starting to poll. Default is :obj:`False`.
.. versionadded :: 13.4
bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the
:class:`telegram.ext.Updater` will retry on failures on the Telegram server.
* < 0 - retry indefinitely
* 0 - no retries (default)
* > 0 - retry up to X times
webhook_url (:obj:`str`, optional): Explicitly specify the webhook url. Useful behind
NAT, reverse proxy, etc. Default is derived from :paramref:`listen`,
:paramref:`port`, :paramref:`url_path`, :paramref:`cert`, and :paramref:`key`.
ip_address (:obj:`str`, optional): Passed to :meth:`telegram.Bot.set_webhook`.
Defaults to :obj:`None`.
.. versionadded :: 13.4
allowed_updates (List[:obj:`str`], optional): Passed to
:meth:`telegram.Bot.set_webhook`. Defaults to :obj:`None`.
max_connections (:obj:`int`, optional): Passed to
:meth:`telegram.Bot.set_webhook`. Defaults to ``40``.
.. versionadded:: 13.6
secret_token (:obj:`str`, optional): Passed to :meth:`telegram.Bot.set_webhook`.
Defaults to :obj:`None`.
When added, the web server started by this call will expect the token to be set in
the ``X-Telegram-Bot-Api-Secret-Token`` header of an incoming request and will
raise a :class:`http.HTTPStatus.FORBIDDEN <http.HTTPStatus>` error if either the
header isn't set or it is set to a wrong token.
.. versionadded:: 20.0
unix (:class:`pathlib.Path` | :obj:`str`, optional): Path to the unix socket file. Path
does not need to exist, in which case the file will be created.
Caution:
This parameter is a replacement for the default TCP bind. Therefore, it is
mutually exclusive with :paramref:`listen` and :paramref:`port`. When using
this param, you must also run a reverse proxy to the unix socket and set the
appropriate :paramref:`webhook_url`.
.. versionadded:: NEXT.VERSION
2015-11-16 13:05:57 +01:00
Returns:
:class:`queue.Queue`: The update queue that can be filled from the main thread.
2017-09-01 08:43:08 +02:00
Raises:
:exc:`RuntimeError`: If the updater is already running or was not initialized.
"""
if not WEBHOOKS_AVAILABLE:
raise RuntimeError(
"To use `start_webhook`, PTB must be installed via `pip install "
'"python-telegram-bot[webhooks]"`.'
)
# unix has special requirements what must and mustn't be set when using it
if unix:
error_msg = (
"You can not pass unix and {0}, only use one. Unix if you want to "
"initialize a unix socket, or {0} for a standard TCP server."
)
if not isinstance(listen, DefaultValue):
raise RuntimeError(error_msg.format("listen"))
if not isinstance(port, DefaultValue):
raise RuntimeError(error_msg.format("port"))
if not webhook_url:
raise RuntimeError(
"Since you set unix, you also need to set the URL to the webhook "
"of the proxy you run in front of the unix socket."
)
async with self.__lock:
if self.running:
raise RuntimeError("This Updater is already running!")
if not self._initialized:
raise RuntimeError("This Updater was not initialized via `Updater.initialize`!")
self._running = True
try:
# Create & start tasks
webhook_ready = asyncio.Event()
await self._start_webhook(
listen=DefaultValue.get_value(listen),
port=DefaultValue.get_value(port),
url_path=url_path,
cert=cert,
key=key,
bootstrap_retries=bootstrap_retries,
drop_pending_updates=drop_pending_updates,
webhook_url=webhook_url,
allowed_updates=allowed_updates,
ready=webhook_ready,
ip_address=ip_address,
max_connections=max_connections,
secret_token=secret_token,
unix=unix,
)
2023-04-10 17:01:35 +02:00
_LOGGER.debug("Waiting for webhook server to start")
await webhook_ready.wait()
2023-04-10 17:01:35 +02:00
_LOGGER.debug("Webhook server started")
except Exception as exc:
self._running = False
raise exc
2015-11-16 13:05:57 +01:00
# Return the update queue so the main thread can insert updates
return self.update_queue
2015-11-16 13:05:57 +01:00
async def _start_webhook(
2017-01-16 00:15:09 +01:00
self,
listen: str,
port: int,
url_path: str,
bootstrap_retries: int,
allowed_updates: Optional[List[str]],
cert: Optional[Union[str, Path]] = None,
key: Optional[Union[str, Path]] = None,
drop_pending_updates: Optional[bool] = None,
webhook_url: Optional[str] = None,
ready: Optional[asyncio.Event] = None,
ip_address: Optional[str] = None,
max_connections: int = 40,
secret_token: Optional[str] = None,
unix: Optional[Union[str, Path]] = None,
) -> None:
2023-04-10 17:01:35 +02:00
_LOGGER.debug("Updater thread started (webhook)")
if not url_path.startswith("/"):
url_path = f"/{url_path}"
2015-11-16 13:05:57 +01:00
# Create Tornado app instance
app = WebhookAppClass(url_path, self.bot, self.update_queue, secret_token)
# Form SSL Context
# An SSLError is raised if the private key does not match with the certificate
# Note that we only use the SSL certificate for the WebhookServer, if the key is also
# present. This is because the WebhookServer may not actually be in charge of performing
# the SSL handshake, e.g. in case a reverse proxy is used
if cert is not None and key is not None:
try:
ssl_ctx: Optional[ssl.SSLContext] = ssl.create_default_context(
ssl.Purpose.CLIENT_AUTH
)
ssl_ctx.load_cert_chain(cert, key) # type: ignore[union-attr]
except ssl.SSLError as exc:
raise TelegramError("Invalid SSL Certificate") from exc
else:
ssl_ctx = None
# Create and start server
self._httpd = WebhookServer(listen, port, app, ssl_ctx, unix)
if not webhook_url:
webhook_url = self._gen_webhook_url(
protocol="https" if ssl_ctx else "http",
listen=DefaultValue.get_value(listen),
port=port,
url_path=url_path,
)
# We pass along the cert to the webhook if present.
await self._bootstrap(
# Passing a Path or string only works if the bot is running against a local bot API
# server, so let's read the contents
cert=Path(cert).read_bytes() if cert else None,
max_retries=bootstrap_retries,
drop_pending_updates=drop_pending_updates,
webhook_url=webhook_url,
allowed_updates=allowed_updates,
ip_address=ip_address,
max_connections=max_connections,
secret_token=secret_token,
)
await self._httpd.serve_forever(ready=ready)
@staticmethod
def _gen_webhook_url(protocol: str, listen: str, port: int, url_path: str) -> str:
# TODO: double check if this should be https in any case - the docs of start_webhook
# say differently!
return f"{protocol}://{listen}:{port}{url_path}"
async def _network_loop_retry(
self,
action_cb: Callable[..., Coroutine],
on_err_cb: Callable[[TelegramError], None],
description: str,
interval: float,
) -> None:
"""Perform a loop calling `action_cb`, retrying after network errors.
Documentation Improvements (#2008) * Minor doc updates, following official API docs * Fix spelling in Defaults docstrings * Clarify Changelog of v12.7 about aware dates * Fix typo in CHANGES.rst (#2024) * Fix PicklePersistence.flush() with only bot_data (#2017) * Update pylint in pre-commit to fix CI (#2018) * Add Filters.via_bot (#2009) * feat: via_bot filter also fixing a small mistake in the empty parameter of the user filter and improve docs slightly * fix: forgot to set via_bot to None * fix: redoing subclassing to copy paste solution * Cosmetic changes Co-authored-by: Hinrich Mahler <hinrich.mahler@freenet.de> * Update CHANGES.rst Fixed Typo Co-authored-by: Bibo-Joshi <hinrich.mahler@freenet.de> Co-authored-by: Poolitzer <25934244+Poolitzer@users.noreply.github.com> * Update downloads badge, add info on IRC Channel to Getting Help section * Remove RegexHandler from ConversationHandlers Docs (#1973) Replaced RegexHandler with MessageHandler, since the former is deprecated * Fix Filters.via_bot docstrings * Add notes on Markdown v1 being legacy mode * Fixed typo in the Regex doc.. (#2036) * Typo: Spelling * Minor cleanup from #2043 * Document CommandHandler ignoring channel posts * Doc fixes for a few telegram.ext classes * Doc fixes for most `telegram` classes. * pep-8 forgot the hard wrap is at 99 chars, not 100! fixed a few spelling mistakes too. * Address review and made rendering of booleans consistent True, False, None are now rendered with ``bool`` wherever they weren't in telegram and telegram.ext classes. * Few doc fixes for inline* classes As usual, docs were cross-checked with official tg api docs. * Doc fixes for telegram/files classes As usual, docs were cross-checked with official tg api docs. * Doc fixes for telegram.Game Mostly just added hyperlinks. And fixed message length doc. As usual, docs were cross-checked with official tg api docs. * Very minor doc fix for passportfile.py and passportelementerrors.py Didn't bother changing too much since this seems to be a custom implementation. * Doc fixes for telegram.payments As usual, cross-checked with official bot api docs. * Address review 2 Few tiny other fixes too. * Changed from ``True/False/None`` to :obj:`True/False/None` project-wide. Few tiny other doc fixes too. Co-authored-by: Robert Geislinger <mitachundkrach@gmail.com> Co-authored-by: Poolitzer <25934244+Poolitzer@users.noreply.github.com> Co-authored-by: GauthamramRavichandran <30320759+GauthamramRavichandran@users.noreply.github.com> Co-authored-by: Mahesh19 <maheshvagicherla99438@gmail.com> Co-authored-by: hoppingturtles <ilovebhagwan@gmail.com>
2020-08-24 19:35:57 +02:00
Stop condition for loop: `self.running` evaluates :obj:`False` or return value of
`action_cb` evaluates :obj:`False`.
Args:
action_cb (:term:`coroutine function`): Network oriented callback function to call.
on_err_cb (:obj:`callable`): Callback to call when TelegramError is caught. Receives
the exception object as a parameter.
description (:obj:`str`): Description text to use for logs and exception raised.
interval (:obj:`float` | :obj:`int`): Interval to sleep between each call to
`action_cb`.
"""
2023-04-10 17:01:35 +02:00
_LOGGER.debug("Start network loop retry %s", description)
cur_interval = interval
try:
while self.running:
try:
if not await action_cb():
break
except RetryAfter as exc:
2023-04-10 17:01:35 +02:00
_LOGGER.info("%s", exc)
cur_interval = 0.5 + exc.retry_after
except TimedOut as toe:
2023-04-10 17:01:35 +02:00
_LOGGER.debug("Timed out %s: %s", description, toe)
# If failure is due to timeout, we should retry asap.
cur_interval = 0
except InvalidToken as pex:
2023-04-10 17:01:35 +02:00
_LOGGER.error("Invalid token; aborting")
raise pex
except TelegramError as telegram_exc:
2023-04-10 17:01:35 +02:00
_LOGGER.error("Error while %s: %s", description, telegram_exc)
on_err_cb(telegram_exc)
# increase waiting times on subsequent errors up to 30secs
cur_interval = 1 if cur_interval == 0 else min(30, 1.5 * cur_interval)
else:
cur_interval = interval
if cur_interval:
await asyncio.sleep(cur_interval)
except asyncio.CancelledError:
_LOGGER.debug("Network loop retry %s was cancelled", description)
async def _bootstrap(
self,
max_retries: int,
webhook_url: Optional[str],
allowed_updates: Optional[List[str]],
drop_pending_updates: Optional[bool] = None,
cert: Optional[bytes] = None,
bootstrap_interval: float = 1,
ip_address: Optional[str] = None,
max_connections: int = 40,
secret_token: Optional[str] = None,
) -> None:
"""Prepares the setup for fetching updates: delete or set the webhook and drop pending
updates if appropriate. If there are unsuccessful attempts, this will retry as specified by
:paramref:`max_retries`.
"""
retries = 0
async def bootstrap_del_webhook() -> bool:
2023-04-10 17:01:35 +02:00
_LOGGER.debug("Deleting webhook")
if drop_pending_updates:
2023-04-10 17:01:35 +02:00
_LOGGER.debug("Dropping pending updates from Telegram server")
await self.bot.delete_webhook(drop_pending_updates=drop_pending_updates)
return False
async def bootstrap_set_webhook() -> bool:
2023-04-10 17:01:35 +02:00
_LOGGER.debug("Setting webhook")
if drop_pending_updates:
2023-04-10 17:01:35 +02:00
_LOGGER.debug("Dropping pending updates from Telegram server")
await self.bot.set_webhook(
url=webhook_url,
certificate=cert,
allowed_updates=allowed_updates,
ip_address=ip_address,
drop_pending_updates=drop_pending_updates,
max_connections=max_connections,
secret_token=secret_token,
)
return False
def bootstrap_on_err_cb(exc: Exception) -> None:
# We need this since retries is an immutable object otherwise and the changes
# wouldn't propagate outside of thi function
nonlocal retries
if not isinstance(exc, InvalidToken) and (max_retries < 0 or retries < max_retries):
retries += 1
2023-04-10 17:01:35 +02:00
_LOGGER.warning(
"Failed bootstrap phase; try=%s max_retries=%s", retries, max_retries
)
else:
2023-04-10 17:01:35 +02:00
_LOGGER.error("Failed bootstrap phase after %s retries (%s)", retries, exc)
raise exc
# Dropping pending updates from TG can be efficiently done with the drop_pending_updates
# parameter of delete/start_webhook, even in the case of polling. Also, we want to make
# sure that no webhook is configured in case of polling, so we just always call
# delete_webhook for polling
if drop_pending_updates or not webhook_url:
await self._network_loop_retry(
bootstrap_del_webhook,
bootstrap_on_err_cb,
"bootstrap del webhook",
bootstrap_interval,
)
# Reset the retries counter for the next _network_loop_retry call
retries = 0
# Restore/set webhook settings, if needed. Again, we don't know ahead if a webhook is set,
# so we set it anyhow.
if webhook_url:
await self._network_loop_retry(
bootstrap_set_webhook,
bootstrap_on_err_cb,
"bootstrap set webhook",
bootstrap_interval,
)
async def stop(self) -> None:
"""Stops the polling/webhook.
.. seealso::
:meth:`start_polling`, :meth:`start_webhook`
Raises:
:exc:`RuntimeError`: If the updater is not running.
"""
async with self.__lock:
if not self.running:
raise RuntimeError("This Updater is not running!")
2023-04-10 17:01:35 +02:00
_LOGGER.debug("Stopping Updater")
self._running = False
await self._stop_httpd()
await self._stop_polling()
2015-11-23 03:45:47 +01:00
2023-04-10 17:01:35 +02:00
_LOGGER.debug("Updater.stop() is complete")
2015-11-23 17:40:39 +01:00
async def _stop_httpd(self) -> None:
"""Stops the Webhook server by calling ``WebhookServer.shutdown()``"""
if self._httpd:
2023-04-10 17:01:35 +02:00
_LOGGER.debug("Waiting for current webhook connection to be closed.")
await self._httpd.shutdown()
self._httpd = None
async def _stop_polling(self) -> None:
"""Stops the polling task by awaiting it."""
if self.__polling_task:
2023-04-10 17:01:35 +02:00
_LOGGER.debug("Waiting background polling task to finish up.")
self.__polling_task.cancel()
2015-11-23 03:45:47 +01:00
2023-03-25 19:18:04 +01:00
with contextlib.suppress(asyncio.CancelledError):
await self.__polling_task
2023-03-25 19:18:04 +01:00
# It only fails in rare edge-cases, e.g. when `stop()` is called directly
# after start_polling(), but lets better be safe than sorry ...
2015-11-23 03:45:47 +01:00
self.__polling_task = None
if self.__polling_cleanup_cb:
await self.__polling_cleanup_cb()
self.__polling_cleanup_cb = None
else:
_LOGGER.warning(
"No polling cleanup callback defined. The last fetched updates may be "
"fetched again on the next polling start."
)