mirror of
https://github.com/python-telegram-bot/python-telegram-bot.git
synced 2024-12-22 14:35:00 +01:00
8f9db63f4f
Co-authored-by: Bibo-Joshi <22366557+Bibo-Joshi@users.noreply.github.com>
184 lines
5.9 KiB
Python
184 lines
5.9 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# A library that provides a Python interface to the Telegram Bot API
|
|
# Copyright (C) 2015-2024
|
|
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Lesser Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Lesser Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser Public License
|
|
# along with this program. If not, see [http://www.gnu.org/licenses/].
|
|
"""This module contains the BaseProcessor class."""
|
|
from abc import ABC, abstractmethod
|
|
from asyncio import BoundedSemaphore
|
|
from types import TracebackType
|
|
from typing import Any, AsyncContextManager, Awaitable, Optional, Type, TypeVar, final
|
|
|
|
_BUPT = TypeVar("_BUPT", bound="BaseUpdateProcessor")
|
|
|
|
|
|
class BaseUpdateProcessor(AsyncContextManager["BaseUpdateProcessor"], ABC):
|
|
"""An abstract base class for update processors. You can use this class to implement
|
|
your own update processor.
|
|
|
|
Instances of this class can be used as asyncio context managers, where
|
|
|
|
.. code:: python
|
|
|
|
async with processor:
|
|
# code
|
|
|
|
is roughly equivalent to
|
|
|
|
.. code:: python
|
|
|
|
try:
|
|
await processor.initialize()
|
|
# code
|
|
finally:
|
|
await processor.shutdown()
|
|
|
|
.. seealso:: :meth:`__aenter__` and :meth:`__aexit__`.
|
|
|
|
.. seealso:: :wiki:`Concurrency`
|
|
|
|
.. versionadded:: 20.4
|
|
|
|
Args:
|
|
max_concurrent_updates (:obj:`int`): The maximum number of updates to be processed
|
|
concurrently. If this number is exceeded, new updates will be queued until the number
|
|
of currently processed updates decreases.
|
|
|
|
Raises:
|
|
:exc:`ValueError`: If :paramref:`max_concurrent_updates` is a non-positive integer.
|
|
"""
|
|
|
|
__slots__ = ("_max_concurrent_updates", "_semaphore")
|
|
|
|
def __init__(self, max_concurrent_updates: int):
|
|
self._max_concurrent_updates = max_concurrent_updates
|
|
if self.max_concurrent_updates < 1:
|
|
raise ValueError("`max_concurrent_updates` must be a positive integer!")
|
|
self._semaphore = BoundedSemaphore(self.max_concurrent_updates)
|
|
|
|
async def __aenter__(self: _BUPT) -> _BUPT: # noqa: PYI019
|
|
"""|async_context_manager| :meth:`initializes <initialize>` the Processor.
|
|
|
|
Returns:
|
|
The initialized Processor instance.
|
|
|
|
Raises:
|
|
:exc:`Exception`: If an exception is raised during initialization, :meth:`shutdown`
|
|
is called in this case.
|
|
"""
|
|
try:
|
|
await self.initialize()
|
|
except Exception:
|
|
await self.shutdown()
|
|
raise
|
|
return self
|
|
|
|
async def __aexit__(
|
|
self,
|
|
exc_type: Optional[Type[BaseException]],
|
|
exc_val: Optional[BaseException],
|
|
exc_tb: Optional[TracebackType],
|
|
) -> None:
|
|
"""|async_context_manager| :meth:`shuts down <shutdown>` the Processor."""
|
|
await self.shutdown()
|
|
|
|
@property
|
|
def max_concurrent_updates(self) -> int:
|
|
""":obj:`int`: The maximum number of updates that can be processed concurrently."""
|
|
return self._max_concurrent_updates
|
|
|
|
@abstractmethod
|
|
async def do_process_update(
|
|
self,
|
|
update: object,
|
|
coroutine: "Awaitable[Any]",
|
|
) -> None:
|
|
"""Custom implementation of how to process an update. Must be implemented by a subclass.
|
|
|
|
Warning:
|
|
This method will be called by :meth:`process_update`. It should *not* be called
|
|
manually.
|
|
|
|
Args:
|
|
update (:obj:`object`): The update to be processed.
|
|
coroutine (:term:`Awaitable`): The coroutine that will be awaited to process the
|
|
update.
|
|
"""
|
|
|
|
@abstractmethod
|
|
async def initialize(self) -> None:
|
|
"""Initializes the processor so resources can be allocated. Must be implemented by a
|
|
subclass.
|
|
|
|
.. seealso::
|
|
:meth:`shutdown`
|
|
"""
|
|
|
|
@abstractmethod
|
|
async def shutdown(self) -> None:
|
|
"""Shutdown the processor so resources can be freed. Must be implemented by a subclass.
|
|
|
|
.. seealso::
|
|
:meth:`initialize`
|
|
"""
|
|
|
|
@final
|
|
async def process_update(
|
|
self,
|
|
update: object,
|
|
coroutine: "Awaitable[Any]",
|
|
) -> None:
|
|
"""Calls :meth:`do_process_update` with a semaphore to limit the number of concurrent
|
|
updates.
|
|
|
|
Args:
|
|
update (:obj:`object`): The update to be processed.
|
|
coroutine (:term:`Awaitable`): The coroutine that will be awaited to process the
|
|
update.
|
|
"""
|
|
async with self._semaphore:
|
|
await self.do_process_update(update, coroutine)
|
|
|
|
|
|
class SimpleUpdateProcessor(BaseUpdateProcessor):
|
|
"""Instance of :class:`telegram.ext.BaseUpdateProcessor` that immediately awaits the
|
|
coroutine, i.e. does not apply any additional processing. This is used by default when
|
|
:attr:`telegram.ext.ApplicationBuilder.concurrent_updates` is :obj:`int`.
|
|
|
|
.. versionadded:: 20.4
|
|
"""
|
|
|
|
__slots__ = ()
|
|
|
|
async def do_process_update(
|
|
self,
|
|
update: object, # noqa: ARG002
|
|
coroutine: "Awaitable[Any]",
|
|
) -> None:
|
|
"""Immediately awaits the coroutine, i.e. does not apply any additional processing.
|
|
|
|
Args:
|
|
update (:obj:`object`): The update to be processed.
|
|
coroutine (:term:`Awaitable`): The coroutine that will be awaited to process the
|
|
update.
|
|
"""
|
|
await coroutine
|
|
|
|
async def initialize(self) -> None:
|
|
"""Does nothing."""
|
|
|
|
async def shutdown(self) -> None:
|
|
"""Does nothing."""
|