mirror of
https://github.com/teloxide/teloxide.git
synced 2025-03-14 11:44:04 +01:00
Add a way to get info from/into Thrrotle
This patch adds `Thrrotle::{limits,set_limits,queue_status}` functions and `QueueStatus` struct allowing to get current status of worker & change limits of the fly.
This commit is contained in:
parent
0dc565211f
commit
34e73748c3
2 changed files with 259 additions and 136 deletions
|
@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||
|
||||
- `impl Clone` for {`CacheMe`, `DefaultParseMode`, `Throttle`} ([#76][pr76])
|
||||
- `DefaultParseMode::parse_mode` which allows to get currently used default parse mode ([#77][pr77])
|
||||
- `Thrrotle::{limits,set_limits,queue_status}` functions and `QueueStatus` struct ([#77][pr77])
|
||||
- Getters for fields nested in `Chat` ([#80][pr80])
|
||||
- API errors: `ApiError::NotEnoughRightsToManagePins`, `ApiError::BotKickedFromSupergroup` ([#84][pr84])
|
||||
- Telegram bot API 5.2 support ([#86][pr86])
|
||||
|
|
|
@ -23,91 +23,6 @@ use crate::{
|
|||
types::*,
|
||||
};
|
||||
|
||||
// Throttling is quite complicated. This comment describes the algorithm of the
|
||||
// current implementation.
|
||||
//
|
||||
// ### Request
|
||||
//
|
||||
// When a throttling request is sent, it sends a tuple of `ChatId` and
|
||||
// `Sender<()>` to the worker. Then the request waits for a notification from
|
||||
// the worker. When notification is received, it sends the underlying request.
|
||||
//
|
||||
// ### Worker
|
||||
//
|
||||
// The worker does the most important job -- it ensures that the limits are
|
||||
// never exceeded.
|
||||
//
|
||||
// The worker stores a history of requests sent in the last minute (and to which
|
||||
// chats they were sent) and a queue of pending updates.
|
||||
//
|
||||
// The worker does the following algorithm loop:
|
||||
//
|
||||
// 1. If the queue is empty, wait for the first message in incoming channel (and
|
||||
// add it to the queue).
|
||||
//
|
||||
// 2. Read all present messages from an incoming channel and transfer them to
|
||||
// the queue.
|
||||
//
|
||||
// 3. Record the current time.
|
||||
//
|
||||
// 4. Clear the history from records whose time < (current time - minute).
|
||||
//
|
||||
// 5. Count all requests which were sent last second, `allowed =
|
||||
// limit.messages_per_sec_overall - count`.
|
||||
//
|
||||
// 6. If `allowed == 0` wait a bit and `continue` to the next iteration.
|
||||
//
|
||||
// 7. Count how many requests were sent to which chats (i.e.: create
|
||||
// `Map<ChatId, Count>`). (Note: the same map, but for last minute also exists,
|
||||
// but it's updated, instead of recreation.)
|
||||
//
|
||||
// 8. While `allowed >= 0` search for requests which chat haven't exceed the
|
||||
// limits (i.e.: map[chat] < limit), if one is found, decrease `allowed`, notify
|
||||
// the request that it can be now executed, increase counts, add record to the
|
||||
// history.
|
||||
|
||||
const MINUTE: Duration = Duration::from_secs(60);
|
||||
const SECOND: Duration = Duration::from_secs(1);
|
||||
|
||||
// Delay between worker iterations.
|
||||
//
|
||||
// For now it's `second/4`, but that number is chosen pretty randomly, we may
|
||||
// want to change this.
|
||||
const DELAY: Duration = Duration::from_millis(250);
|
||||
|
||||
/// Telegram request limits.
|
||||
///
|
||||
/// This struct is used in [`Throttle`].
|
||||
///
|
||||
/// Note that you may ask telegram [@BotSupport] to increase limits for your
|
||||
/// particular bot if it has a lot of users (but they may or may not do that).
|
||||
///
|
||||
/// [@BotSupport]: https://t.me/botsupport
|
||||
#[derive(Copy, Clone, Eq, PartialEq, Hash)]
|
||||
pub struct Limits {
|
||||
/// Allowed messages in one chat per second.
|
||||
pub messages_per_sec_chat: u32,
|
||||
|
||||
/// Allowed messages in one chat per minute.
|
||||
pub messages_per_min_chat: u32,
|
||||
|
||||
/// Allowed messages per second.
|
||||
pub messages_per_sec_overall: u32,
|
||||
}
|
||||
|
||||
/// Defaults are taken from [telegram documentation][tgdoc].
|
||||
///
|
||||
/// [tgdoc]: https://core.telegram.org/bots/faq#my-bot-is-hitting-limits-how-do-i-avoid-this
|
||||
impl Default for Limits {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
messages_per_sec_chat: 1,
|
||||
messages_per_sec_overall: 30,
|
||||
messages_per_min_chat: 20,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Automatic request limits respecting mechanism.
|
||||
///
|
||||
/// Telegram has strict [limits], which, if exceeded will sooner or later cause
|
||||
|
@ -151,6 +66,176 @@ pub struct Throttle<B> {
|
|||
bot: B,
|
||||
// `RequestLock` allows to unlock requests (allowing them to be sent).
|
||||
queue: mpsc::Sender<(ChatIdHash, RequestLock)>,
|
||||
info_tx: mpsc::Sender<InfoMessage>,
|
||||
}
|
||||
|
||||
impl<B> Throttle<B> {
|
||||
/// Creates new [`Throttle`] alongside with worker future.
|
||||
///
|
||||
/// Note: [`Throttle`] will only send requests if returned worker is
|
||||
/// polled/spawned/awaited.
|
||||
pub fn new(bot: B, limits: Limits) -> (Self, impl Future<Output = ()>) {
|
||||
let (tx, rx) = mpsc::channel(limits.messages_per_sec_overall as usize);
|
||||
let (info_tx, info_rx) = mpsc::channel(2);
|
||||
|
||||
let worker = worker(limits, rx, info_rx);
|
||||
let this = Self {
|
||||
bot,
|
||||
queue: tx,
|
||||
info_tx,
|
||||
};
|
||||
|
||||
(this, worker)
|
||||
}
|
||||
|
||||
/// Creates new [`Throttle`] spawning the worker with `tokio::spawn`
|
||||
///
|
||||
/// Note: it's recommended to use [`RequesterExt::throttle`] instead.
|
||||
///
|
||||
/// [`RequesterExt::throttle`]: crate::requests::RequesterExt::throttle
|
||||
pub fn new_spawn(bot: B, limits: Limits) -> Self
|
||||
where
|
||||
// Basically, I hate this bound.
|
||||
// This is yet another problem caused by [rust-lang/#76882].
|
||||
// And I think it *is* a bug.
|
||||
//
|
||||
// [rust-lang/#76882]: https://github.com/rust-lang/rust/issues/76882
|
||||
//
|
||||
// Though crucially I can't think of a case with non-static bot.
|
||||
// But anyway, it doesn't change the fact that this bound is redundant.
|
||||
//
|
||||
// (waffle)
|
||||
B: 'static,
|
||||
{
|
||||
let (this, worker) = Self::new(bot, limits);
|
||||
tokio::spawn(worker);
|
||||
this
|
||||
}
|
||||
|
||||
/// Allows to access inner bot
|
||||
pub fn inner(&self) -> &B {
|
||||
&self.bot
|
||||
}
|
||||
|
||||
/// Unwraps inner bot
|
||||
pub fn into_inner(self) -> B {
|
||||
self.bot
|
||||
}
|
||||
|
||||
/// Returns currently used [`Limits`].
|
||||
pub async fn limits(&self) -> Limits {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
self.info_tx
|
||||
.send(InfoMessage::GetLimits { response: tx })
|
||||
.await
|
||||
.expect(WORKER_DIED);
|
||||
|
||||
rx.await.expect(WORKER_DIED)
|
||||
}
|
||||
|
||||
/// Sets new limits.
|
||||
///
|
||||
/// Note: changes may not be applied imidiately.
|
||||
pub async fn set_limits(&self, new: Limits) {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
self.info_tx
|
||||
.send(InfoMessage::SetLimits { new, response: tx })
|
||||
.await
|
||||
.ok();
|
||||
|
||||
rx.await.ok();
|
||||
}
|
||||
|
||||
/// Returns current queue status.
|
||||
pub async fn queue_status(&self) -> QueueStatus {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
self.info_tx
|
||||
.send(InfoMessage::QueueStatus { response: tx })
|
||||
.await
|
||||
.expect(WORKER_DIED);
|
||||
|
||||
rx.await.expect(WORKER_DIED)
|
||||
}
|
||||
}
|
||||
|
||||
const WORKER_DIED: &str = "worker died before last `Throttle` instance";
|
||||
|
||||
/// Telegram request limits.
|
||||
///
|
||||
/// This struct is used in [`Throttle`].
|
||||
///
|
||||
/// Note that you may ask telegram [@BotSupport] to increase limits for your
|
||||
/// particular bot if it has a lot of users (but they may or may not do that).
|
||||
///
|
||||
/// [@BotSupport]: https://t.me/botsupport
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
|
||||
pub struct Limits {
|
||||
/// Allowed messages in one chat per second.
|
||||
pub messages_per_sec_chat: u32,
|
||||
|
||||
/// Allowed messages in one chat per minute.
|
||||
pub messages_per_min_chat: u32,
|
||||
|
||||
/// Allowed messages per second.
|
||||
pub messages_per_sec_overall: u32,
|
||||
}
|
||||
|
||||
/// Defaults are taken from [telegram documentation][tgdoc].
|
||||
///
|
||||
/// [tgdoc]: https://core.telegram.org/bots/faq#my-bot-is-hitting-limits-how-do-i-avoid-this
|
||||
impl Default for Limits {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
messages_per_sec_chat: 1,
|
||||
messages_per_sec_overall: 30,
|
||||
messages_per_min_chat: 20,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Requests queue status.
|
||||
#[derive(Clone, Debug)]
|
||||
#[non_exhaustive]
|
||||
pub struct QueueStatus {
|
||||
/// Number of requests waiting to be sent.
|
||||
pub used: usize,
|
||||
|
||||
/// Capacity of the queue.
|
||||
pub capacity: usize,
|
||||
}
|
||||
|
||||
impl QueueStatus {
|
||||
/// Returns `true` if the queue is full. Note that if the queue is full
|
||||
/// number of requests may be greater than [`used`].
|
||||
///
|
||||
/// [`used`]: QueueStatus::used
|
||||
pub fn is_full(&self) -> bool {
|
||||
self.used >= self.capacity
|
||||
}
|
||||
|
||||
/// Returns `true` if the queue is empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.used == 0
|
||||
}
|
||||
}
|
||||
|
||||
const MINUTE: Duration = Duration::from_secs(60);
|
||||
const SECOND: Duration = Duration::from_secs(1);
|
||||
|
||||
// Delay between worker iterations.
|
||||
//
|
||||
// For now it's `second/4`, but that number is chosen pretty randomly, we may
|
||||
// want to change this.
|
||||
const DELAY: Duration = Duration::from_millis(250);
|
||||
|
||||
#[derive(Debug)]
|
||||
enum InfoMessage {
|
||||
GetLimits { response: Sender<Limits> },
|
||||
SetLimits { new: Limits, response: Sender<()> },
|
||||
QueueStatus { response: Sender<QueueStatus> },
|
||||
}
|
||||
|
||||
type RequestsSent = u32;
|
||||
|
@ -164,7 +249,54 @@ struct RequestsSentToChats {
|
|||
per_sec: HashMap<ChatIdHash, RequestsSent>,
|
||||
}
|
||||
|
||||
async fn worker(limits: Limits, mut rx: mpsc::Receiver<(ChatIdHash, RequestLock)>) {
|
||||
// Throttling is quite complicated. This comment describes the algorithm of the
|
||||
// current implementation.
|
||||
//
|
||||
// ### Request
|
||||
//
|
||||
// When a throttling request is sent, it sends a tuple of `ChatId` and
|
||||
// `Sender<()>` to the worker. Then the request waits for a notification from
|
||||
// the worker. When notification is received, it sends the underlying request.
|
||||
//
|
||||
// ### Worker
|
||||
//
|
||||
// The worker does the most important job -- it ensures that the limits are
|
||||
// never exceeded.
|
||||
//
|
||||
// The worker stores a history of requests sent in the last minute (and to which
|
||||
// chats they were sent) and a queue of pending updates.
|
||||
//
|
||||
// The worker does the following algorithm loop:
|
||||
//
|
||||
// 1. If the queue is empty, wait for the first message in incoming channel (and
|
||||
// add it to the queue).
|
||||
//
|
||||
// 2. Read all present messages from an incoming channel and transfer them to
|
||||
// the queue.
|
||||
//
|
||||
// 3. Record the current time.
|
||||
//
|
||||
// 4. Clear the history from records whose time < (current time - minute).
|
||||
//
|
||||
// 5. Count all requests which were sent last second, `allowed =
|
||||
// limit.messages_per_sec_overall - count`.
|
||||
//
|
||||
// 6. If `allowed == 0` wait a bit and `continue` to the next iteration.
|
||||
//
|
||||
// 7. Count how many requests were sent to which chats (i.e.: create
|
||||
// `Map<ChatId, Count>`). (Note: the same map, but for last minute also exists,
|
||||
// but it's updated, instead of recreation.)
|
||||
//
|
||||
// 8. While `allowed >= 0` search for requests which chat haven't exceed the
|
||||
// limits (i.e.: map[chat] < limit), if one is found, decrease `allowed`, notify
|
||||
// the request that it can be now executed, increase counts, add record to the
|
||||
// history.
|
||||
|
||||
async fn worker(
|
||||
mut limits: Limits,
|
||||
mut rx: mpsc::Receiver<(ChatIdHash, RequestLock)>,
|
||||
mut info_rx: mpsc::Receiver<InfoMessage>,
|
||||
) {
|
||||
// FIXME(waffle): Make an research about data structures for this queue.
|
||||
// Currently this is O(n) removing (n = number of elements
|
||||
// stayed), amortized O(1) push (vec+vecrem).
|
||||
|
@ -177,8 +309,22 @@ async fn worker(limits: Limits, mut rx: mpsc::Receiver<(ChatIdHash, RequestLock)
|
|||
let mut rx_is_closed = false;
|
||||
|
||||
while !rx_is_closed || !queue.is_empty() {
|
||||
// FIXME(waffle):
|
||||
// 1. This call 'blocks' requests executing untill all info messages are
|
||||
// answered. Crucially, info messages are rare & easy to answer, but this
|
||||
// still doesn't feel right.
|
||||
//
|
||||
// 2. If the `queue` is empty, `read_from_rx` call down below will 'block'
|
||||
// execution untill a request is sent. While the execution is 'blocked' no
|
||||
// `InfoMessage`s could be answered.
|
||||
//
|
||||
// 3. If limits are descreased, idially we want to shrink queue.
|
||||
//
|
||||
// *blocked in asyncronous way
|
||||
answer_info(&mut info_rx, &mut queue, &mut limits).await;
|
||||
|
||||
read_from_rx(&mut rx, &mut queue, &mut rx_is_closed).await;
|
||||
debug_assert_eq!(queue.capacity(), limits.messages_per_sec_overall as usize);
|
||||
//debug_assert_eq!(queue.capacity(), limits.messages_per_sec_overall as usize);
|
||||
|
||||
// _Maybe_ we need to use `spawn_blocking` here, because there is
|
||||
// decent amount of blocking work. However _for now_ I've decided not
|
||||
|
@ -287,6 +433,31 @@ async fn worker(limits: Limits, mut rx: mpsc::Receiver<(ChatIdHash, RequestLock)
|
|||
}
|
||||
}
|
||||
|
||||
async fn answer_info(
|
||||
rx: &mut mpsc::Receiver<InfoMessage>,
|
||||
queue: &mut Vec<(ChatIdHash, RequestLock)>,
|
||||
limits: &mut Limits,
|
||||
) {
|
||||
// FIXME(waffle): https://github.com/tokio-rs/tokio/issues/3350
|
||||
while let Some(Some(req)) = rx.recv().now_or_never() {
|
||||
// Errors are ignored with .ok(). Error means that the response channel
|
||||
// is closed and the response isn't needed.
|
||||
match req {
|
||||
InfoMessage::GetLimits { response } => response.send(*limits).ok(),
|
||||
InfoMessage::SetLimits { new, response } => {
|
||||
*limits = new;
|
||||
response.send(()).ok()
|
||||
}
|
||||
InfoMessage::QueueStatus { response } => response
|
||||
.send(QueueStatus {
|
||||
used: queue.len(),
|
||||
capacity: queue.capacity(),
|
||||
})
|
||||
.ok(),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_from_rx<T>(rx: &mut mpsc::Receiver<T>, queue: &mut Vec<T>, rx_is_closed: &mut bool) {
|
||||
if queue.is_empty() {
|
||||
match rx.recv().await {
|
||||
|
@ -307,55 +478,6 @@ async fn read_from_rx<T>(rx: &mut mpsc::Receiver<T>, queue: &mut Vec<T>, rx_is_c
|
|||
}
|
||||
}
|
||||
|
||||
impl<B> Throttle<B> {
|
||||
/// Creates new [`Throttle`] alongside with worker future.
|
||||
///
|
||||
/// Note: [`Throttle`] will only send requests if returned worker is
|
||||
/// polled/spawned/awaited.
|
||||
pub fn new(bot: B, limits: Limits) -> (Self, impl Future<Output = ()>) {
|
||||
let (tx, rx) = mpsc::channel(limits.messages_per_sec_overall as usize);
|
||||
|
||||
let worker = worker(limits, rx);
|
||||
let this = Self { bot, queue: tx };
|
||||
|
||||
(this, worker)
|
||||
}
|
||||
|
||||
/// Creates new [`Throttle`] spawning the worker with `tokio::spawn`
|
||||
///
|
||||
/// Note: it's recommended to use [`RequesterExt::throttle`] instead.
|
||||
///
|
||||
/// [`RequesterExt::throttle`]: crate::requests::RequesterExt::throttle
|
||||
pub fn new_spawn(bot: B, limits: Limits) -> Self
|
||||
where
|
||||
// Basically, I hate this bound.
|
||||
// This is yet another problem caused by [rust-lang/#76882].
|
||||
// And I think it *is* a bug.
|
||||
//
|
||||
// [rust-lang/#76882]: https://github.com/rust-lang/rust/issues/76882
|
||||
//
|
||||
// Though crucially I can't think of a case with non-static bot.
|
||||
// But anyway, it doesn't change the fact that this bound is redundant.
|
||||
//
|
||||
// (waffle)
|
||||
B: 'static,
|
||||
{
|
||||
let (this, worker) = Self::new(bot, limits);
|
||||
tokio::spawn(worker);
|
||||
this
|
||||
}
|
||||
|
||||
/// Allows to access inner bot
|
||||
pub fn inner(&self) -> &B {
|
||||
&self.bot
|
||||
}
|
||||
|
||||
/// Unwraps inner bot
|
||||
pub fn into_inner(self) -> B {
|
||||
self.bot
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! f {
|
||||
($m:ident $this:ident ($($arg:ident : $T:ty),*)) => {
|
||||
ThrottlingRequest {
|
||||
|
|
Loading…
Add table
Reference in a new issue