[throttle] comment and doc improvements

This commit is contained in:
Waffle 2020-10-02 16:23:49 +03:00
parent 6f03ca0954
commit 380e189ec7

View file

@ -23,13 +23,59 @@ use crate::{
};
use std::hash::{Hash, Hasher};
const MINUTE: Duration = Duration::from_secs(50); // FIXME: min = sec * 10 only in tests
// Throttling is quite complicated this comment describes the algorithm of current implementation.
// NOTE: this only describes CURRENT implementation. Implementation may change at any time.
//
// ### Request
//
// When throttling request is sent, it sends a tuple of `ChatId` (more accurately, just local `Id`)
// and `Sender<()>` to the worker. Then the request waits for notification from worker. When
// notification is received it sends underlying request.
//
// ### Worker
//
// Worker does the most important job - it checks for limit exceed.
//
// The worker stores "history" of requests sent in last minute (and to which chats the were sent)
// and queue of pending updates.
//
// The worker does the following algorithm loop:
//
// 1. If queue is empty wait for the first message in incoming channel (and adds it to queue).
//
// 2. Read all present messages from incoming channel and transfer them to queue.
//
// 3. Record current time.
//
// 4. Clear history from records which time < (current - minute)
//
// 5. Count all requests in which were sent last second, `allowed = limit.overall_s - count`
//
// 6. If `allowed == 0` wait a bit and `continue` to the next iteration
//
// 7. Count how many requests were sent to which chats (i.e.: create `Map<ChatId, Count>`)
// (note: the same map, but for last minute also exists, but it's updated, instead of recreation)
//
// 8. While `allowed >= 0` search for requests which chat hasn't exceed limits
// (i.e.: map[chat] < limit), if one is found, decrease `allowed`, notify request
// that it can be now executed, increase counts, add record to history.
const MINUTE: Duration = Duration::from_secs(60);
const SECOND: Duration = Duration::from_secs(1);
const DELAY: Duration = Duration::from_millis(250); // second/4
// Delay between worker iterations.
//
// For now it's `second/4`, but that number is chosen pretty randomly, we may want to change this.
const DELAY: Duration = Duration::from_millis(250);
/// Telegram request limits.
///
/// This struct is used in [`Throttle`]
/// This struct is used in [`Throttle`].
///
/// Note that you may ask telegram [@BotSupport] to increase limits for your
/// particular bot if it has a lot of users (but they may or may not do that).
///
/// [@BotSupport]: https://t.me/botsupport
#[derive(Copy, Clone, Eq, PartialEq, Hash)]
pub struct Limits {
/// Allowed messages in one chat per second
@ -49,9 +95,32 @@ impl Default for Limits {
}
}
/// <we need the doc here>
/// Automatic request limits respecting mechanism.
///
/// ## Note about send-by-@username
/// Telegram has strict [limits], which, if exceeded will sooner or later cause
/// `RequestError::RetryAfter(_)` errors. These errors can cause users of your
/// bot to never receive responds from the bot or receive them in wrong order.
///
/// This bot wrapper automatically checks for limits, suspending requests until
/// they could be sent without exceeding limits (request order in chats is not changed).
///
/// It's recommended to use this wrapper before other wrappers (i.e.: `SomeWrapper<Throttle<Bot>>`)
/// because if done otherwise inner wrappers may cause `Throttle` to miscalculate limits usage.
///
/// [limits]: https://core.telegram.org/bots/faq#my-bot-is-hitting-limits-how-do-i-avoid-this
///
/// ## Examples
///
/// ```
/// use teloxide_core::{bot::Limits, Bot, requests::RequesterExt};
///
/// let bot = Bot::new("TOKEN")
/// .throttle(Limits::default());
///
/// /* send many requests here */
/// ```
///
/// ## Note about send-by-@channelusername
///
/// Telegram have limits on sending messages to _the same chat_. To check them
/// we store `chat_id`s of several last requests. _However_ there is no good way
@ -69,8 +138,6 @@ pub struct Throttle<B> {
}
async fn worker(limits: Limits, mut queue_rx: mpsc::Receiver<(Id, Sender<()>)>) {
// FIXME: remove unnecessary ChatId clones
// FIXME(waffle): Make an research about data structures for this queue.
// Currently this is O(n) removing (n = number of elements
// stayed), amortized O(1) push (vec+vecrem).
@ -222,11 +289,14 @@ impl<B> Throttle<B> {
/// Note: [`Throttle`] will only send requests if returned worker is
/// polled/spawned/awaited.
pub fn new(bot: B, limits: Limits) -> (Self, impl Future<Output = ()>) {
// FIXME: just a random number, currently
let (queue_tx, queue_rx) = mpsc::channel(130);
// A buffer made slightly bigger (112.5%) than overall limit
// so we won't lose performance when hitting limits.
//
// (I hope this makes sense) (waffle)
let buffer = limits.overall_s + (limits.overall_s / 8);
let (queue_tx, queue_rx) = mpsc::channel(buffer as usize);
let worker = worker(limits, queue_rx);
let this = Self { bot, queue: queue_tx };
(this, worker)
@ -514,7 +584,7 @@ mod chan_send {
}
#[pin_project::pin_project]
pub(super) struct ChanSend(#[pin] Inner); // FIXME
pub(super) struct ChanSend(#[pin] Inner);
#[cfg(not(feature = "nightly"))]
type Inner = Pin<Box<dyn Future<Output = Result<(), SendError<(Id, Sender<()>)>>>>>;