diff --git a/src/adaptors/throttle.rs b/src/adaptors/throttle.rs index 38a2c60d..caaef945 100644 --- a/src/adaptors/throttle.rs +++ b/src/adaptors/throttle.rs @@ -1,31 +1,34 @@ +/// `ThrottlingRequest` and `ThrottlingSend` structures +mod request; +/// Lock that allows requests to wait until they are allowed to be sent +mod request_lock; +/// `impl Requester for Throttle<_>` +mod requester_impl; +/// `Settings` and `Limits` structures +mod settings; +/// "Worker" that checks the limits +mod worker; + use std::{ - collections::{hash_map::Entry, HashMap, VecDeque}, future::Future, hash::{Hash, Hasher}, - pin::Pin, - sync::Arc, - time::{Duration, Instant}, }; -use either::Either; -use futures::{ - future::{ready, BoxFuture}, - task::{Context, Poll}, -}; use tokio::sync::{ mpsc, - mpsc::error::TryRecvError, - oneshot::{self, Receiver, Sender}, + oneshot::{self}, }; -use url::Url; -use vecrem::VecExt; -use crate::{ - errors::AsResponseParameters, - requests::{HasPayload, Output, Request, Requester}, - types::*, +use crate::{errors::AsResponseParameters, requests::Requester, types::*}; + +use self::{ + request_lock::{channel, RequestLock}, + worker::{worker, FreezeUntil, InfoMessage}, }; +pub use request::{ThrottlingRequest, ThrottlingSend}; +pub use settings::{Limits, Settings}; + /// Automatic request limits respecting mechanism. /// /// Telegram has strict [limits], which, if exceeded will sooner or later cause @@ -47,7 +50,8 @@ use crate::{ /// ```no_run (throttle fails to spawn task without tokio runtime) /// use teloxide_core::{adaptors::throttle::Limits, requests::RequesterExt, Bot}; /// -/// let bot = Bot::new("TOKEN").throttle(Limits::default()); +/// let bot = Bot::new("TOKEN") +/// .throttle(Limits::default()); /// /// /* send many requests here */ /// ``` @@ -154,6 +158,8 @@ impl Throttle { /// Returns currently used [`Limits`]. pub async fn limits(&self) -> Limits { + const WORKER_DIED: &str = "worker died before last `Throttle` instance"; + let (tx, rx) = oneshot::channel(); self.info_tx @@ -179,592 +185,6 @@ impl Throttle { } } -/// Telegram request limits. -/// -/// This struct is used in [`Throttle`]. -/// -/// Note that you may ask telegram [@BotSupport] to increase limits for your -/// particular bot if it has a lot of users (but they may or may not do that). -/// -/// [@BotSupport]: https://t.me/botsupport -#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] -pub struct Limits { - /// Allowed messages in one chat per second. - pub messages_per_sec_chat: u32, - - /// Allowed messages in one chat per minute. - pub messages_per_min_chat: u32, - - /// Allowed messages in one channel per minute. - pub messages_per_min_channel: u32, - - /// Allowed messages per second. - pub messages_per_sec_overall: u32, -} - -/// Defaults are taken from [telegram documentation][tgdoc] (except for -/// `messages_per_min_channel`). -/// -/// [tgdoc]: https://core.telegram.org/bots/faq#my-bot-is-hitting-limits-how-do-i-avoid-this -impl Default for Limits { - fn default() -> Self { - Self { - messages_per_sec_chat: 1, - messages_per_sec_overall: 30, - messages_per_min_chat: 20, - messages_per_min_channel: 10, - } - } -} - -/// Settings used by [`Throttle`] adaptor. -/// -/// ## Examples -/// -/// ``` -/// use teloxide_core::adaptors::throttle; -/// -/// let settings = throttle::Settings::default() -/// .on_queue_full(|pending| async move { /* do something when internal queue is full */ }); -/// // use settings in `Throttle::with_settings` or other constructors -/// # let _ = settings; -/// ``` -#[non_exhaustive] -pub struct Settings { - pub limits: Limits, - pub on_queue_full: BoxedFnMut, - pub retry: bool, - pub check_slow_mode: bool, -} - -impl Settings { - pub fn limits(mut self, val: Limits) -> Self { - self.limits = val; - self - } - - pub fn on_queue_full(mut self, mut val: F) -> Self - where - F: FnMut(usize) -> Fut + Send + 'static, - Fut: Future + Send + 'static, - { - self.on_queue_full = Box::new(move |pending| Box::pin(val(pending))); - self - } - - pub fn no_retry(mut self) -> Self { - self.retry = false; - self - } - - pub fn check_slow_mode(mut self) -> Self { - self.check_slow_mode = true; - self - } -} - -impl Default for Settings { - fn default() -> Self { - Self { - limits: <_>::default(), - on_queue_full: Box::new(|pending| { - log::warn!("Throttle queue is full ({} pending requests)", pending); - Box::pin(ready(())) - }), - retry: true, - check_slow_mode: false, - } - } -} - -// Required to not trigger `clippy::type-complexity` lint -type BoxedFnMut = Box O + Send>; -type BoxedFuture = Pin + Send>>; - -const WORKER_DIED: &str = "worker died before last `Throttle` instance"; - -const MINUTE: Duration = Duration::from_secs(60); -const SECOND: Duration = Duration::from_secs(1); - -// Delay between worker iterations. -// -// For now it's `second/4`, but that number is chosen pretty randomly, we may -// want to change this. -const DELAY: Duration = Duration::from_millis(250); - -/// Minimal time between calls to queue_full function -const QUEUE_FULL_DELAY: Duration = Duration::from_secs(4); - -#[derive(Debug)] -enum InfoMessage { - GetLimits { response: Sender }, - SetLimits { new: Limits, response: Sender<()> }, -} - -type RequestsSent = u32; - -// I wish there was special data structure for history which removed the -// need in 2 hashmaps -// (waffle) -#[derive(Default)] -struct RequestsSentToChats { - per_min: HashMap, - per_sec: HashMap, -} - -struct FreezeUntil { - until: Instant, - after: Duration, - chat: ChatIdHash, - retry: Option, -} - -// Throttling is quite complicated. This comment describes the algorithm of the -// current implementation. -// -// ### Request -// -// When a throttling request is sent, it sends a tuple of `ChatId` and -// `Sender<()>` to the worker. Then the request waits for a notification from -// the worker. When notification is received, it sends the underlying request. -// -// ### Worker -// -// The worker does the most important job -- it ensures that the limits are -// never exceeded. -// -// The worker stores a history of requests sent in the last minute (and to which -// chats they were sent) and a queue of pending updates. -// -// The worker does the following algorithm loop: -// -// 1. If the queue is empty, wait for the first message in incoming channel (and -// add it to the queue). -// -// 2. Read all present messages from an incoming channel and transfer them to -// the queue. -// -// 3. Record the current time. -// -// 4. Clear the history from records whose time < (current time - minute). -// -// 5. Count all requests which were sent last second, `allowed = -// limit.messages_per_sec_overall - count`. -// -// 6. If `allowed == 0` wait a bit and `continue` to the next iteration. -// -// 7. Count how many requests were sent to which chats (i.e.: create -// `Map`). (Note: the same map, but for last minute also exists, -// but it's updated, instead of recreation.) -// -// 8. While `allowed >= 0` search for requests which chat haven't exceed the -// limits (i.e.: map[chat] < limit), if one is found, decrease `allowed`, notify -// the request that it can be now executed, increase counts, add record to the -// history. - -async fn worker( - Settings { - mut limits, - mut on_queue_full, - retry, - check_slow_mode, - }: Settings, - mut rx: mpsc::Receiver<(ChatIdHash, RequestLock)>, - mut info_rx: mpsc::Receiver, - bot: B, -) where - B: Requester, - B::Err: AsResponseParameters, -{ - // FIXME(waffle): Make an research about data structures for this queue. - // Currently this is O(n) removing (n = number of elements - // stayed), amortized O(1) push (vec+vecrem). - let mut queue: Vec<(ChatIdHash, RequestLock)> = - Vec::with_capacity(limits.messages_per_sec_overall as usize); - - let mut history: VecDeque<(ChatIdHash, Instant)> = VecDeque::new(); - let mut requests_sent = RequestsSentToChats::default(); - - let mut slow_mode: Option> = - check_slow_mode.then(HashMap::new); - - let mut rx_is_closed = false; - - let mut last_queue_full = Instant::now() - .checked_sub(QUEUE_FULL_DELAY) - .unwrap_or_else(Instant::now); - - let (freeze_tx, mut freeze_rx) = mpsc::channel::(1); - - while !rx_is_closed || !queue.is_empty() { - // FIXME(waffle): - // 1. If the `queue` is empty, `read_from_rx` call down below will 'block' - // execution until a request is sent. While the execution is 'blocked' no - // `InfoMessage`s could be answered. - // - // 2. If limits are decreased, ideally we want to shrink queue. - // - // *blocked in asynchronous way - answer_info(&mut info_rx, &mut limits); - - freeze( - &mut freeze_rx, - &freeze_tx, - slow_mode.as_mut(), - &mut queue, - &bot, - None, - ) - .await; - - loop { - tokio::select! { - () = read_from_rx(&mut rx, &mut queue, &mut rx_is_closed) => break, - freeze_until = freeze_rx.recv() => { - freeze( - &mut freeze_rx, - &freeze_tx, - slow_mode.as_mut(), - &mut queue, - &bot, - freeze_until - ) - .await; - }, - } - } - //debug_assert_eq!(queue.capacity(), limits.messages_per_sec_overall as usize); - - if queue.len() == queue.capacity() && last_queue_full.elapsed() > QUEUE_FULL_DELAY { - last_queue_full = Instant::now(); - tokio::spawn(on_queue_full(queue.len())); - } - - // _Maybe_ we need to use `spawn_blocking` here, because there is - // decent amount of blocking work. However _for now_ I've decided not - // to use it here. - // - // Reasons (not to use `spawn_blocking`): - // - // 1. The work seems not very CPU-bound, it's not heavy computations, - // it's more like light computations. - // - // 2. `spawn_blocking` is not zero-cost — it spawns a new system thread - // + do so other work. This may actually be *worse* then current - // "just do everything in this async fn" approach. - // - // 3. With `rt-threaded` feature, tokio uses [`num_cpus()`] threads - // which should be enough to work fine with one a-bit-blocking task. - // Crucially current behaviour will be problem mostly with - // single-threaded runtimes (and in case you're using one, you - // probably don't want to spawn unnecessary threads anyway). - // - // I think if we'll ever change this behaviour, we need to make it - // _configurable_. - // - // See also [discussion (ru)]. - // - // NOTE: If you are reading this because you have any problems because - // of this worker, open an [issue on github] - // - // [`num_cpus()`]: https://vee.gg/JGwq2 - // [discussion (ru)]: https://t.me/rust_async/27891 - // [issue on github]: https://github.com/teloxide/teloxide/issues/new - // - // (waffle) - - let now = Instant::now(); - let min_back = now - MINUTE; - let sec_back = now - SECOND; - - // make history and requests_sent up-to-date - while let Some((_, time)) = history.front() { - // history is sorted, we found first up-to-date thing - if time >= &min_back { - break; - } - - if let Some((chat, _)) = history.pop_front() { - let entry = requests_sent.per_min.entry(chat).and_modify(|count| { - *count -= 1; - }); - - if let Entry::Occupied(entry) = entry { - if *entry.get() == 0 { - entry.remove_entry(); - } - } - } - } - - // as truncates which is ok since in case of truncation it would always be >= - // limits.overall_s - let used = history - .iter() - .take_while(|(_, time)| time > &sec_back) - .count() as u32; - let mut allowed = limits.messages_per_sec_overall.saturating_sub(used); - - if allowed == 0 { - requests_sent.per_sec.clear(); - tokio::time::sleep(DELAY).await; - continue; - } - - for (chat, _) in history.iter().take_while(|(_, time)| time > &sec_back) { - *requests_sent.per_sec.entry(*chat).or_insert(0) += 1; - } - - let mut queue_removing = queue.removing(); - - while let Some(entry) = queue_removing.next() { - let chat = &entry.value().0; - - let slow_mode = slow_mode.as_mut().and_then(|sm| sm.get_mut(chat)); - - if let Some(&mut (delay, last)) = slow_mode { - if last + delay > Instant::now() { - continue; - } - } - - let requests_sent_per_sec_count = requests_sent.per_sec.get(chat).copied().unwrap_or(0); - let requests_sent_per_min_count = requests_sent.per_min.get(chat).copied().unwrap_or(0); - - let messages_per_min_limit = if chat.is_channel() { - limits.messages_per_min_channel - } else { - limits.messages_per_min_chat - }; - - let limits_not_exceeded = requests_sent_per_sec_count < limits.messages_per_sec_chat - && requests_sent_per_min_count < messages_per_min_limit; - - if limits_not_exceeded { - // Unlock the associated request. - - let chat = *chat; - let (_, lock) = entry.remove(); - - // Only count request as sent if the request wasn't dropped before unlocked - if lock.unlock(retry, freeze_tx.clone()).is_ok() { - *requests_sent.per_sec.entry(chat).or_insert(0) += 1; - *requests_sent.per_min.entry(chat).or_insert(0) += 1; - history.push_back((chat, Instant::now())); - - if let Some((_, last)) = slow_mode { - *last = Instant::now(); - } - - // We have "sent" one request, so now we can send one less. - allowed -= 1; - if allowed == 0 { - break; - } - } - } - } - - // It's easier to just recompute last second stats, instead of keeping - // track of it alongside with minute stats, so we just throw this away. - requests_sent.per_sec.clear(); - tokio::time::sleep(DELAY).await; - } -} - -fn answer_info(rx: &mut mpsc::Receiver, limits: &mut Limits) { - while let Ok(req) = rx.try_recv() { - // Errors are ignored with .ok(). Error means that the response channel - // is closed and the response isn't needed. - match req { - InfoMessage::GetLimits { response } => response.send(*limits).ok(), - InfoMessage::SetLimits { new, response } => { - *limits = new; - response.send(()).ok() - } - }; - } -} - -async fn freeze( - rx: &mut mpsc::Receiver, - tx: &mpsc::Sender, - mut slow_mode: Option<&mut HashMap>, - queue: &mut Vec<(ChatIdHash, RequestLock)>, - bot: &impl Requester, - mut imm: Option, -) { - while let Some(freeze_until) = imm.take().or_else(|| rx.try_recv().ok()) { - let FreezeUntil { - until, - after, - chat, - mut retry, - } = freeze_until; - - if let Some(slow_mode) = slow_mode.as_deref_mut() { - // TODO: do something with channels?... - if let hash @ ChatIdHash::Id(id) = chat { - // TODO: maybe not call `get_chat` every time? - - // At this point there isn't much we can do with the error besides ignoring - if let Ok(chat) = bot.get_chat(id).send().await { - match chat.slow_mode_delay() { - Some(delay) => { - let now = Instant::now(); - let new_delay = Duration::from_secs(delay.into()); - slow_mode.insert(hash, (new_delay, now)); - } - None => { - slow_mode.remove(&hash); - } - }; - } - } - } - - // slow mode is enabled and it is <= to the delay asked by telegram - let slow_mode_enabled_and_likely_the_cause = slow_mode - .as_ref() - .and_then(|m| m.get(&chat).map(|(delay, _)| delay <= &after)) - .unwrap_or(false); - - // Do not sleep if slow mode is enabled since the freeze is most likely caused - // by the said slow mode and not by the global limits. - if slow_mode_enabled_and_likely_the_cause { - queue.extend(Some(chat).zip(retry.take())); - } else { - log::warn!( - "freezing the bot for approximately {:?} due to `RetryAfter` error from telegram", - after - ); - - tokio::time::sleep_until(until.into()).await; - - log::warn!("unfreezing the bot"); - - if let Some(lock) = retry { - // Since we are already retrying the request, retries are obviously turned on. - let retry = true; - let _ = lock.unlock(retry, tx.clone()); - } - } - } -} - -async fn read_from_rx(rx: &mut mpsc::Receiver, queue: &mut Vec, rx_is_closed: &mut bool) { - if queue.is_empty() { - log::warn!("A-blocking on queue"); - match rx.recv().await { - Some(req) => queue.push(req), - None => *rx_is_closed = true, - } - } - - // Don't grow queue bigger than the capacity to limit DOS possibility - while queue.len() < queue.capacity() { - match rx.try_recv() { - Ok(req) => queue.push(req), - Err(TryRecvError::Disconnected) => *rx_is_closed = true, - // There are no items in queue. - Err(TryRecvError::Empty) => break, - } - } -} - -macro_rules! f { - ($m:ident $this:ident ($($arg:ident : $T:ty),*)) => { - ThrottlingRequest { - request: Arc::new($this.inner().$m($($arg),*)), - chat_id: |p| (&p.payload_ref().chat_id).into(), - worker: $this.queue.clone(), - } - }; -} - -macro_rules! fty { - ($T:ident) => { - ThrottlingRequest - }; -} - -macro_rules! fid { - ($m:ident $this:ident ($($arg:ident : $T:ty),*)) => { - $this.inner().$m($($arg),*) - }; -} - -macro_rules! ftyid { - ($T:ident) => { - B::$T - }; -} - -impl Requester for Throttle -where - B::Err: AsResponseParameters, - - B::SendMessage: Clone + Send + Sync + 'static, - B::ForwardMessage: Clone + Send + Sync + 'static, - B::CopyMessage: Clone + Send + Sync + 'static, - B::SendPhoto: Clone + Send + Sync + 'static, - B::SendAudio: Clone + Send + Sync + 'static, - B::SendDocument: Clone + Send + Sync + 'static, - B::SendVideo: Clone + Send + Sync + 'static, - B::SendAnimation: Clone + Send + Sync + 'static, - B::SendVoice: Clone + Send + Sync + 'static, - B::SendVideoNote: Clone + Send + Sync + 'static, - B::SendMediaGroup: Clone + Send + Sync + 'static, - B::SendLocation: Clone + Send + Sync + 'static, - B::SendVenue: Clone + Send + Sync + 'static, - B::SendContact: Clone + Send + Sync + 'static, - B::SendPoll: Clone + Send + Sync + 'static, - B::SendDice: Clone + Send + Sync + 'static, - B::SendSticker: Clone + Send + Sync + 'static, - B::SendInvoice: Clone + Send + Sync + 'static, -{ - type Err = B::Err; - - requester_forward! { - send_message, forward_message, copy_message, send_photo, send_audio, - send_document, send_video, send_animation, send_voice, send_video_note, - send_media_group, send_location, send_venue, send_contact, send_poll, - send_dice, send_sticker, send_invoice => f, fty - } - - requester_forward! { - get_me, log_out, close, get_updates, set_webhook, delete_webhook, get_webhook_info, - edit_message_live_location, edit_message_live_location_inline, - stop_message_live_location, stop_message_live_location_inline, - send_chat_action, get_user_profile_photos, get_file, kick_chat_member, ban_chat_member, - unban_chat_member, restrict_chat_member, promote_chat_member, - set_chat_administrator_custom_title, - ban_chat_sender_chat, unban_chat_sender_chat, set_chat_permissions, - export_chat_invite_link, create_chat_invite_link, edit_chat_invite_link, - revoke_chat_invite_link, set_chat_photo, delete_chat_photo, set_chat_title, - set_chat_description, pin_chat_message, unpin_chat_message, unpin_all_chat_messages, - leave_chat, get_chat, get_chat_administrators, get_chat_members_count, get_chat_member_count, - get_chat_member, set_chat_sticker_set, delete_chat_sticker_set, - answer_callback_query, set_my_commands, get_my_commands, delete_my_commands, answer_inline_query, - edit_message_text, edit_message_text_inline, edit_message_caption, - edit_message_caption_inline, edit_message_media, edit_message_media_inline, - edit_message_reply_markup, edit_message_reply_markup_inline, stop_poll, - delete_message, get_sticker_set, upload_sticker_file, create_new_sticker_set, - add_sticker_to_set, set_sticker_position_in_set, delete_sticker_from_set, - set_sticker_set_thumb, answer_shipping_query, answer_pre_checkout_query, - set_passport_data_errors, send_game, set_game_score, set_game_score_inline, - approve_chat_join_request, decline_chat_join_request, - get_game_high_scores => fid, ftyid - } -} - -download_forward! { - 'w - B - Throttle - { this => this.inner() } -} - /// An ID used in the worker. /// /// It is used instead of `ChatId` to make copying cheap even in case of @@ -797,174 +217,3 @@ impl From<&Recipient> for ChatIdHash { } } } - -#[must_use = "Requests are lazy and do nothing unless sent"] -pub struct ThrottlingRequest { - request: Arc, - chat_id: fn(&R::Payload) -> ChatIdHash, - worker: mpsc::Sender<(ChatIdHash, RequestLock)>, -} - -impl HasPayload for ThrottlingRequest { - type Payload = R::Payload; - - /// Note that if this request was already executed via `send_ref` and it - /// didn't yet completed, this method will clone the underlying request - fn payload_mut(&mut self) -> &mut Self::Payload { - Arc::make_mut(&mut self.request).payload_mut() - } - - fn payload_ref(&self) -> &Self::Payload { - self.request.payload_ref() - } -} - -impl Request for ThrottlingRequest -where - R: Request + Clone + Send + Sync + 'static, // TODO: rem static - R::Err: AsResponseParameters + Send, - Output: Send, -{ - type Err = R::Err; - type Send = ThrottlingSend; - type SendRef = ThrottlingSend; - - fn send(self) -> Self::Send { - let chat = (self.chat_id)(self.payload_ref()); - let request = Either::from(Arc::try_unwrap(self.request)); - - ThrottlingSend(Box::pin(send(request, chat, self.worker))) - } - - fn send_ref(&self) -> Self::SendRef { - let chat = (self.chat_id)(self.payload_ref()); - let request = Either::Left(Arc::clone(&self.request)); - - ThrottlingSend(Box::pin(send(request, chat, self.worker.clone()))) - } -} - -#[pin_project::pin_project] -pub struct ThrottlingSend(#[pin] BoxFuture<'static, Result, R::Err>>); - -impl Future for ThrottlingSend -where - R::Err: AsResponseParameters, -{ - type Output = Result, R::Err>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.as_mut().project().0.poll(cx) - } -} - -/// Actual implementation of the `ThrottlingSend` future -async fn send( - request: Either, R>, - chat: ChatIdHash, - worker: mpsc::Sender<(ChatIdHash, RequestLock)>, -) -> Result, R::Err> -where - R: Request + Send + Sync + 'static, - R::Err: AsResponseParameters + Send, - Output: Send, -{ - // We use option to `take` when sending by value. - // - // All unwraps down below will succed because we always return immediately after - // taking. - let mut request: Either, Option> = request.map_right(Some); - - loop { - let (lock, wait) = channel(); - - // The worker is unlikely to drop queue before sending all requests, - // but just in case it has dropped the queue, we want to just send the - // request. - if let Err(_) = worker.send((chat, lock)).await { - return match &mut request { - Either::Left(shared) => shared.send_ref().await, - Either::Right(owned) => owned.take().unwrap().send().await, - }; - }; - - let (retry, freeze) = wait.await; - - let res = match (retry, &mut request) { - (true, request) => { - request - .as_ref() - .either(|r| &**r, |r| r.as_ref().unwrap()) - .send_ref() - .await - } - (false, Either::Left(shared)) => shared.send_ref().await, - (false, Either::Right(owned)) => owned.take().unwrap().send().await, - }; - - let retry_after = res.as_ref().err().and_then(<_>::retry_after); - if let Some(retry_after) = retry_after { - let after = Duration::from_secs(retry_after.into()); - - if retry { - log::warn!("Freezing, before retrying: {}", retry_after); - } - - let (lock, wait) = channel(); - - // Error here means that the worker died, so we can't really do anything about - // it - let _ = freeze - .send(FreezeUntil { - until: Instant::now(), // TODO: this is obviously wrong - after, - chat, - retry: Some(lock), - }) - .await; - - wait.await; - } - - match res { - res @ Ok(_) => break res, - res @ Err(_) if !retry => break res, - Err(_) => { - // Next iteration will retry - } - }; - } -} - -fn channel() -> (RequestLock, RequestWaiter) { - let (tx, rx) = oneshot::channel(); - let tx = RequestLock(tx); - let rx = RequestWaiter(rx); - (tx, rx) -} - -#[must_use] -struct RequestLock(Sender<(bool, mpsc::Sender)>); - -impl RequestLock { - fn unlock(self, retry: bool, freeze: mpsc::Sender) -> Result<(), ()> { - self.0.send((retry, freeze)).map_err(drop) - } -} - -#[must_use] -#[pin_project::pin_project] -struct RequestWaiter(#[pin] Receiver<(bool, mpsc::Sender)>); - -impl Future for RequestWaiter { - type Output = (bool, mpsc::Sender); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - match this.0.poll(cx) { - Poll::Ready(Ok(ret)) => Poll::Ready(ret), - Poll::Ready(Err(_)) => panic!("`RequestLock` is dropped by the throttle worker"), - Poll::Pending => Poll::Pending, - } - } -} diff --git a/src/adaptors/throttle/request.rs b/src/adaptors/throttle/request.rs new file mode 100644 index 00000000..424a6950 --- /dev/null +++ b/src/adaptors/throttle/request.rs @@ -0,0 +1,157 @@ +use std::{ + future::Future, + pin::Pin, + sync::Arc, + time::{Duration, Instant}, +}; + +use either::Either; +use futures::{ + future::BoxFuture, + task::{Context, Poll}, +}; +use tokio::sync::mpsc; + +use crate::{ + adaptors::throttle::{channel, ChatIdHash, FreezeUntil, RequestLock}, + errors::AsResponseParameters, + requests::{HasPayload, Output, Request}, +}; + +#[must_use = "Requests are lazy and do nothing unless sent"] +pub struct ThrottlingRequest { + pub(super) request: Arc, + pub(super) chat_id: fn(&R::Payload) -> ChatIdHash, + pub(super) worker: mpsc::Sender<(ChatIdHash, RequestLock)>, +} + +#[pin_project::pin_project] +pub struct ThrottlingSend(#[pin] BoxFuture<'static, Result, R::Err>>); + +impl HasPayload for ThrottlingRequest { + type Payload = R::Payload; + + /// Note that if this request was already executed via `send_ref` and it + /// didn't yet completed, this method will clone the underlying request + fn payload_mut(&mut self) -> &mut Self::Payload { + Arc::make_mut(&mut self.request).payload_mut() + } + + fn payload_ref(&self) -> &Self::Payload { + self.request.payload_ref() + } +} + +impl Request for ThrottlingRequest +where + R: Request + Clone + Send + Sync + 'static, // TODO: rem static + R::Err: AsResponseParameters + Send, + Output: Send, +{ + type Err = R::Err; + type Send = ThrottlingSend; + type SendRef = ThrottlingSend; + + fn send(self) -> Self::Send { + let chat = (self.chat_id)(self.payload_ref()); + let request = Either::from(Arc::try_unwrap(self.request)); + + ThrottlingSend(Box::pin(send(request, chat, self.worker))) + } + + fn send_ref(&self) -> Self::SendRef { + let chat = (self.chat_id)(self.payload_ref()); + let request = Either::Left(Arc::clone(&self.request)); + + ThrottlingSend(Box::pin(send(request, chat, self.worker.clone()))) + } +} + +impl Future for ThrottlingSend +where + R::Err: AsResponseParameters, +{ + type Output = Result, R::Err>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.as_mut().project().0.poll(cx) + } +} + +/// Actual implementation of the `ThrottlingSend` future +async fn send( + request: Either, R>, + chat: ChatIdHash, + worker: mpsc::Sender<(ChatIdHash, RequestLock)>, +) -> Result, R::Err> +where + R: Request + Send + Sync + 'static, + R::Err: AsResponseParameters + Send, + Output: Send, +{ + // We use option to `take` when sending by value. + // + // All unwraps down below will succed because we always return immediately after + // taking. + let mut request: Either, Option> = request.map_right(Some); + + loop { + let (lock, wait) = channel(); + + // The worker is unlikely to drop queue before sending all requests, + // but just in case it has dropped the queue, we want to just send the + // request. + if let Err(_) = worker.send((chat, lock)).await { + return match &mut request { + Either::Left(shared) => shared.send_ref().await, + Either::Right(owned) => owned.take().unwrap().send().await, + }; + }; + + let (retry, freeze) = wait.await; + + let res = match (retry, &mut request) { + (true, request) => { + request + .as_ref() + .either(|r| &**r, |r| r.as_ref().unwrap()) + .send_ref() + .await + } + (false, Either::Left(shared)) => shared.send_ref().await, + (false, Either::Right(owned)) => owned.take().unwrap().send().await, + }; + + let retry_after = res.as_ref().err().and_then(<_>::retry_after); + if let Some(retry_after) = retry_after { + let after = Duration::from_secs(retry_after.into()); + + if retry { + log::warn!("Freezing, before retrying: {}", retry_after); + } + + let (lock, wait) = channel(); + + // Error here means that the worker died, so we can't really do anything about + // it + let _ = freeze + .send(FreezeUntil { + until: Instant::now(), // TODO: this is obviously wrong + after, + chat, + retry: Some(lock), + }) + .await; + + wait.await; + } + + match res { + res @ Ok(_) => break res, + res @ Err(_) if !retry => break res, + Err(_) => { + // Next iteration will retry + } + }; + } +} diff --git a/src/adaptors/throttle/request_lock.rs b/src/adaptors/throttle/request_lock.rs new file mode 100644 index 00000000..e21093c2 --- /dev/null +++ b/src/adaptors/throttle/request_lock.rs @@ -0,0 +1,45 @@ +use std::pin::Pin; + +use futures::{ + task::{Context, Poll}, + Future, +}; +use tokio::sync::{ + mpsc, + oneshot::{self, Receiver, Sender}, +}; + +use crate::adaptors::throttle::FreezeUntil; + +pub(super) fn channel() -> (RequestLock, RequestWaiter) { + let (tx, rx) = oneshot::channel(); + let tx = RequestLock(tx); + let rx = RequestWaiter(rx); + (tx, rx) +} + +#[must_use] +pub(super) struct RequestLock(Sender<(bool, mpsc::Sender)>); + +#[must_use] +#[pin_project::pin_project] +pub(super) struct RequestWaiter(#[pin] Receiver<(bool, mpsc::Sender)>); + +impl RequestLock { + pub(super) fn unlock(self, retry: bool, freeze: mpsc::Sender) -> Result<(), ()> { + self.0.send((retry, freeze)).map_err(drop) + } +} + +impl Future for RequestWaiter { + type Output = (bool, mpsc::Sender); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.0.poll(cx) { + Poll::Ready(Ok(ret)) => Poll::Ready(ret), + Poll::Ready(Err(_)) => panic!("`RequestLock` is dropped by the throttle worker"), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/src/adaptors/throttle/requester_impl.rs b/src/adaptors/throttle/requester_impl.rs new file mode 100644 index 00000000..a3c45e42 --- /dev/null +++ b/src/adaptors/throttle/requester_impl.rs @@ -0,0 +1,103 @@ +use std::sync::Arc; + +use url::Url; + +use crate::{ + adaptors::{throttle::ThrottlingRequest, Throttle}, + errors::AsResponseParameters, + requests::{HasPayload, Requester}, + types::*, +}; + +macro_rules! f { + ($m:ident $this:ident ($($arg:ident : $T:ty),*)) => { + ThrottlingRequest { + request: Arc::new($this.inner().$m($($arg),*)), + chat_id: |p| (&p.payload_ref().chat_id).into(), + worker: $this.queue.clone(), + } + }; +} + +macro_rules! fty { + ($T:ident) => { + ThrottlingRequest + }; +} + +macro_rules! fid { + ($m:ident $this:ident ($($arg:ident : $T:ty),*)) => { + $this.inner().$m($($arg),*) + }; +} + +macro_rules! ftyid { + ($T:ident) => { + B::$T + }; +} + +impl Requester for Throttle +where + B::Err: AsResponseParameters, + + B::SendMessage: Clone + Send + Sync + 'static, + B::ForwardMessage: Clone + Send + Sync + 'static, + B::CopyMessage: Clone + Send + Sync + 'static, + B::SendPhoto: Clone + Send + Sync + 'static, + B::SendAudio: Clone + Send + Sync + 'static, + B::SendDocument: Clone + Send + Sync + 'static, + B::SendVideo: Clone + Send + Sync + 'static, + B::SendAnimation: Clone + Send + Sync + 'static, + B::SendVoice: Clone + Send + Sync + 'static, + B::SendVideoNote: Clone + Send + Sync + 'static, + B::SendMediaGroup: Clone + Send + Sync + 'static, + B::SendLocation: Clone + Send + Sync + 'static, + B::SendVenue: Clone + Send + Sync + 'static, + B::SendContact: Clone + Send + Sync + 'static, + B::SendPoll: Clone + Send + Sync + 'static, + B::SendDice: Clone + Send + Sync + 'static, + B::SendSticker: Clone + Send + Sync + 'static, + B::SendInvoice: Clone + Send + Sync + 'static, +{ + type Err = B::Err; + + requester_forward! { + send_message, forward_message, copy_message, send_photo, send_audio, + send_document, send_video, send_animation, send_voice, send_video_note, + send_media_group, send_location, send_venue, send_contact, send_poll, + send_dice, send_sticker, send_invoice => f, fty + } + + requester_forward! { + get_me, log_out, close, get_updates, set_webhook, delete_webhook, get_webhook_info, + edit_message_live_location, edit_message_live_location_inline, + stop_message_live_location, stop_message_live_location_inline, + send_chat_action, get_user_profile_photos, get_file, kick_chat_member, ban_chat_member, + unban_chat_member, restrict_chat_member, promote_chat_member, + set_chat_administrator_custom_title, + ban_chat_sender_chat, unban_chat_sender_chat, set_chat_permissions, + export_chat_invite_link, create_chat_invite_link, edit_chat_invite_link, + revoke_chat_invite_link, set_chat_photo, delete_chat_photo, set_chat_title, + set_chat_description, pin_chat_message, unpin_chat_message, unpin_all_chat_messages, + leave_chat, get_chat, get_chat_administrators, get_chat_members_count, get_chat_member_count, + get_chat_member, set_chat_sticker_set, delete_chat_sticker_set, + answer_callback_query, set_my_commands, get_my_commands, delete_my_commands, answer_inline_query, + edit_message_text, edit_message_text_inline, edit_message_caption, + edit_message_caption_inline, edit_message_media, edit_message_media_inline, + edit_message_reply_markup, edit_message_reply_markup_inline, stop_poll, + delete_message, get_sticker_set, upload_sticker_file, create_new_sticker_set, + add_sticker_to_set, set_sticker_position_in_set, delete_sticker_from_set, + set_sticker_set_thumb, answer_shipping_query, answer_pre_checkout_query, + set_passport_data_errors, send_game, set_game_score, set_game_score_inline, + approve_chat_join_request, decline_chat_join_request, + get_game_high_scores => fid, ftyid + } +} + +download_forward! { + 'w + B + Throttle + { this => this.inner() } +} diff --git a/src/adaptors/throttle/settings.rs b/src/adaptors/throttle/settings.rs new file mode 100644 index 00000000..82ef7e4a --- /dev/null +++ b/src/adaptors/throttle/settings.rs @@ -0,0 +1,105 @@ +use std::pin::Pin; + +use futures::{future::ready, Future}; + +// Required to not trigger `clippy::type-complexity` lint +type BoxedFnMut = Box O + Send>; +type BoxedFuture = Pin + Send>>; + +/// Settings used by [`Throttle`] adaptor. +/// +/// ## Examples +/// +/// ``` +/// use teloxide_core::adaptors::throttle; +/// +/// let settings = throttle::Settings::default() +/// .on_queue_full(|pending| async move { /* do something when internal queue is full */ }); +/// // use settings in `Throttle::with_settings` or other constructors +/// # let _ = settings; +/// ``` +#[non_exhaustive] +pub struct Settings { + pub limits: Limits, + pub on_queue_full: BoxedFnMut, + pub retry: bool, + pub check_slow_mode: bool, +} + +/// Telegram request limits. +/// +/// This struct is used in [`Throttle`]. +/// +/// Note that you may ask telegram [@BotSupport] to increase limits for your +/// particular bot if it has a lot of users (but they may or may not do that). +/// +/// [@BotSupport]: https://t.me/botsupport +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] +pub struct Limits { + /// Allowed messages in one chat per second. + pub messages_per_sec_chat: u32, + + /// Allowed messages in one chat per minute. + pub messages_per_min_chat: u32, + + /// Allowed messages in one channel per minute. + pub messages_per_min_channel: u32, + + /// Allowed messages per second. + pub messages_per_sec_overall: u32, +} + +impl Settings { + pub fn limits(mut self, val: Limits) -> Self { + self.limits = val; + self + } + + pub fn on_queue_full(mut self, mut val: F) -> Self + where + F: FnMut(usize) -> Fut + Send + 'static, + Fut: Future + Send + 'static, + { + self.on_queue_full = Box::new(move |pending| Box::pin(val(pending))); + self + } + + pub fn no_retry(mut self) -> Self { + self.retry = false; + self + } + + pub fn check_slow_mode(mut self) -> Self { + self.check_slow_mode = true; + self + } +} + +impl Default for Settings { + fn default() -> Self { + Self { + limits: <_>::default(), + on_queue_full: Box::new(|pending| { + log::warn!("Throttle queue is full ({} pending requests)", pending); + Box::pin(ready(())) + }), + retry: true, + check_slow_mode: false, + } + } +} + +/// Defaults are taken from [telegram documentation][tgdoc] (except for +/// `messages_per_min_channel`). +/// +/// [tgdoc]: https://core.telegram.org/bots/faq#my-bot-is-hitting-limits-how-do-i-avoid-this +impl Default for Limits { + fn default() -> Self { + Self { + messages_per_sec_chat: 1, + messages_per_sec_overall: 30, + messages_per_min_chat: 20, + messages_per_min_channel: 10, + } + } +} diff --git a/src/adaptors/throttle/worker.rs b/src/adaptors/throttle/worker.rs new file mode 100644 index 00000000..eae5961f --- /dev/null +++ b/src/adaptors/throttle/worker.rs @@ -0,0 +1,401 @@ +use std::{ + collections::{hash_map::Entry, HashMap, VecDeque}, + time::{Duration, Instant}, +}; + +use tokio::sync::{mpsc, mpsc::error::TryRecvError, oneshot::Sender}; +use vecrem::VecExt; + +use crate::{ + adaptors::throttle::{request_lock::RequestLock, ChatIdHash, Limits, Settings}, + errors::AsResponseParameters, + requests::{Request, Requester}, +}; + +const MINUTE: Duration = Duration::from_secs(60); +const SECOND: Duration = Duration::from_secs(1); + +// Delay between worker iterations. +// +// For now it's `second/4`, but that number is chosen pretty randomly, we may +// want to change this. +const DELAY: Duration = Duration::from_millis(250); + +/// Minimal time beetween calls to queue_full function +const QUEUE_FULL_DELAY: Duration = Duration::from_secs(4); + +#[derive(Debug)] +pub(super) enum InfoMessage { + GetLimits { response: Sender }, + SetLimits { new: Limits, response: Sender<()> }, +} + +type RequestsSent = u32; + +// I wish there was special data structure for history which removed the +// need in 2 hashmaps +// (waffle) +#[derive(Default)] +struct RequestsSentToChats { + per_min: HashMap, + per_sec: HashMap, +} + +pub(super) struct FreezeUntil { + pub(super) until: Instant, + pub(super) after: Duration, + pub(super) chat: ChatIdHash, + pub(super) retry: Option, +} + +// Throttling is quite complicated. This comment describes the algorithm of the +// current implementation. +// +// ### Request +// +// When a throttling request is sent, it sends a tuple of `ChatId` and +// `Sender<()>` to the worker. Then the request waits for a notification from +// the worker. When notification is received, it sends the underlying request. +// +// ### Worker +// +// The worker does the most important job -- it ensures that the limits are +// never exceeded. +// +// The worker stores a history of requests sent in the last minute (and to which +// chats they were sent) and a queue of pending updates. +// +// The worker does the following algorithm loop: +// +// 1. If the queue is empty, wait for the first message in incoming channel (and +// add it to the queue). +// +// 2. Read all present messages from an incoming channel and transfer them to +// the queue. +// +// 3. Record the current time. +// +// 4. Clear the history from records whose time < (current time - minute). +// +// 5. Count all requests which were sent last second, `allowed = +// limit.messages_per_sec_overall - count`. +// +// 6. If `allowed == 0` wait a bit and `continue` to the next iteration. +// +// 7. Count how many requests were sent to which chats (i.e.: create +// `Map`). (Note: the same map, but for last minute also exists, +// but it's updated, instead of recreation.) +// +// 8. While `allowed >= 0` search for requests which chat haven't exceed the +// limits (i.e.: map[chat] < limit), if one is found, decrease `allowed`, notify +// the request that it can be now executed, increase counts, add record to the +// history. +pub(super) async fn worker( + Settings { + mut limits, + mut on_queue_full, + retry, + check_slow_mode, + }: Settings, + mut rx: mpsc::Receiver<(ChatIdHash, RequestLock)>, + mut info_rx: mpsc::Receiver, + bot: B, +) where + B: Requester, + B::Err: AsResponseParameters, +{ + // FIXME(waffle): Make an research about data structures for this queue. + // Currently this is O(n) removing (n = number of elements + // stayed), amortized O(1) push (vec+vecrem). + let mut queue: Vec<(ChatIdHash, RequestLock)> = + Vec::with_capacity(limits.messages_per_sec_overall as usize); + + let mut history: VecDeque<(ChatIdHash, Instant)> = VecDeque::new(); + let mut requests_sent = RequestsSentToChats::default(); + + let mut slow_mode: Option> = + check_slow_mode.then(HashMap::new); + + let mut rx_is_closed = false; + + let mut last_queue_full = Instant::now() + .checked_sub(QUEUE_FULL_DELAY) + .unwrap_or_else(Instant::now); + + let (freeze_tx, mut freeze_rx) = mpsc::channel::(1); + + while !rx_is_closed || !queue.is_empty() { + // FIXME(waffle): + // 1. If the `queue` is empty, `read_from_rx` call down below will 'block' + // execution untill a request is sent. While the execution is 'blocked' no + // `InfoMessage`s could be answered. + // + // 2. If limits are descreased, ideally we want to shrink queue. + // + // *blocked in asyncronous way + answer_info(&mut info_rx, &mut limits); + + freeze( + &mut freeze_rx, + &freeze_tx, + slow_mode.as_mut(), + &mut queue, + &bot, + None, + ) + .await; + + loop { + tokio::select! { + () = read_from_rx(&mut rx, &mut queue, &mut rx_is_closed) => break, + freeze_until = freeze_rx.recv() => { + freeze( + &mut freeze_rx, + &freeze_tx, + slow_mode.as_mut(), + &mut queue, + &bot, + freeze_until + ) + .await; + }, + } + } + //debug_assert_eq!(queue.capacity(), limits.messages_per_sec_overall as usize); + + if queue.len() == queue.capacity() && last_queue_full.elapsed() > QUEUE_FULL_DELAY { + last_queue_full = Instant::now(); + tokio::spawn(on_queue_full(queue.len())); + } + + // _Maybe_ we need to use `spawn_blocking` here, because there is + // decent amount of blocking work. However _for now_ I've decided not + // to use it here. + // + // Reasons (not to use `spawn_blocking`): + // + // 1. The work seems not very CPU-bound, it's not heavy computations, + // it's more like light computations. + // + // 2. `spawn_blocking` is not zero-cost — it spawns a new system thread + // + do so other work. This may actually be *worse* then current + // "just do everything in this async fn" approach. + // + // 3. With `rt-threaded` feature, tokio uses [`num_cpus()`] threads + // which should be enough to work fine with one a-bit-blocking task. + // Crucially current behaviour will be problem mostly with + // single-threaded runtimes (and in case you're using one, you + // probably don't want to spawn unnecessary threads anyway). + // + // I think if we'll ever change this behaviour, we need to make it + // _configurable_. + // + // See also [discussion (ru)]. + // + // NOTE: If you are reading this because you have any problems because + // of this worker, open an [issue on github] + // + // [`num_cpus()`]: https://vee.gg/JGwq2 + // [discussion (ru)]: https://t.me/rust_async/27891 + // [issue on github]: https://github.com/teloxide/teloxide/issues/new + // + // (waffle) + + let now = Instant::now(); + let min_back = now - MINUTE; + let sec_back = now - SECOND; + + // make history and hchats up-to-date + while let Some((_, time)) = history.front() { + // history is sorted, we found first up-to-date thing + if time >= &min_back { + break; + } + + if let Some((chat, _)) = history.pop_front() { + let entry = requests_sent.per_min.entry(chat).and_modify(|count| { + *count -= 1; + }); + + if let Entry::Occupied(entry) = entry { + if *entry.get() == 0 { + entry.remove_entry(); + } + } + } + } + + // as truncates which is ok since in case of truncation it would always be >= + // limits.overall_s + let used = history + .iter() + .take_while(|(_, time)| time > &sec_back) + .count() as u32; + let mut allowed = limits.messages_per_sec_overall.saturating_sub(used); + + if allowed == 0 { + requests_sent.per_sec.clear(); + tokio::time::sleep(DELAY).await; + continue; + } + + for (chat, _) in history.iter().take_while(|(_, time)| time > &sec_back) { + *requests_sent.per_sec.entry(*chat).or_insert(0) += 1; + } + + let mut queue_removing = queue.removing(); + + while let Some(entry) = queue_removing.next() { + let chat = &entry.value().0; + + let slow_mode = slow_mode.as_mut().and_then(|sm| sm.get_mut(chat)); + + if let Some(&mut (delay, last)) = slow_mode { + if last + delay > Instant::now() { + continue; + } + } + + let requests_sent_per_sec_count = requests_sent.per_sec.get(chat).copied().unwrap_or(0); + let requests_sent_per_min_count = requests_sent.per_min.get(chat).copied().unwrap_or(0); + + let messages_per_min_limit = if chat.is_channel() { + limits.messages_per_min_channel + } else { + limits.messages_per_min_chat + }; + + let limits_not_exceeded = requests_sent_per_sec_count < limits.messages_per_sec_chat + && requests_sent_per_min_count < messages_per_min_limit; + + if limits_not_exceeded { + // Unlock the associated request. + + let chat = *chat; + let (_, lock) = entry.remove(); + + // Only count request as sent if the request wasn't dropped before unlocked + if lock.unlock(retry, freeze_tx.clone()).is_ok() { + *requests_sent.per_sec.entry(chat).or_insert(0) += 1; + *requests_sent.per_min.entry(chat).or_insert(0) += 1; + history.push_back((chat, Instant::now())); + + if let Some((_, last)) = slow_mode { + *last = Instant::now(); + } + + // We have "sent" one request, so now we can send one less. + allowed -= 1; + if allowed == 0 { + break; + } + } + } + } + + // It's easier to just recompute last second stats, instead of keeping + // track of it alongside with minute stats, so we just throw this away. + requests_sent.per_sec.clear(); + tokio::time::sleep(DELAY).await; + } +} + +fn answer_info(rx: &mut mpsc::Receiver, limits: &mut Limits) { + while let Ok(req) = rx.try_recv() { + // Errors are ignored with .ok(). Error means that the response channel + // is closed and the response isn't needed. + match req { + InfoMessage::GetLimits { response } => response.send(*limits).ok(), + InfoMessage::SetLimits { new, response } => { + *limits = new; + response.send(()).ok() + } + }; + } +} + +async fn freeze( + rx: &mut mpsc::Receiver, + tx: &mpsc::Sender, + mut slow_mode: Option<&mut HashMap>, + queue: &mut Vec<(ChatIdHash, RequestLock)>, + bot: &impl Requester, + mut imm: Option, +) { + while let Some(freeze_until) = imm.take().or_else(|| rx.try_recv().ok()) { + let FreezeUntil { + until, + after, + chat, + mut retry, + } = freeze_until; + + if let Some(slow_mode) = slow_mode.as_deref_mut() { + // TODO: do something with channels?... + if let hash @ ChatIdHash::Id(id) = chat { + // TODO: maybe not call `get_chat` every time? + + // At this point there isn't much we can do with the error besides ignoring + if let Ok(chat) = bot.get_chat(id).send().await { + match chat.slow_mode_delay() { + Some(delay) => { + let now = Instant::now(); + let new_delay = Duration::from_secs(delay.into()); + slow_mode.insert(hash, (new_delay, now)); + } + None => { + slow_mode.remove(&hash); + } + }; + } + } + } + + // slow mode is enabled and it is <= to the delay asked by telegram + let slow_mode_enabled_and_likely_the_cause = slow_mode + .as_ref() + .and_then(|m| m.get(&chat).map(|(delay, _)| delay <= &after)) + .unwrap_or(false); + + // Do not sleep if slow mode is enabled since the freeze is most likely caused + // by the said slow mode and not by the global limits. + if slow_mode_enabled_and_likely_the_cause { + queue.extend(Some(chat).zip(retry.take())); + } else { + log::warn!( + "freezing the bot for approximately {:?} due to `RetryAfter` error from telegram", + after + ); + + tokio::time::sleep_until(until.into()).await; + + log::warn!("unfreezing the bot"); + + if let Some(lock) = retry { + // Since we are already retrying the request, retries are obviously turned on. + let retry = true; + let _ = lock.unlock(retry, tx.clone()); + } + } + } +} + +async fn read_from_rx(rx: &mut mpsc::Receiver, queue: &mut Vec, rx_is_closed: &mut bool) { + if queue.is_empty() { + log::warn!("A-blocking on queue"); + match rx.recv().await { + Some(req) => queue.push(req), + None => *rx_is_closed = true, + } + } + + // Don't grow queue bigger than the capacity to limit DOS posibility + while queue.len() < queue.capacity() { + match rx.try_recv() { + Ok(req) => queue.push(req), + Err(TryRecvError::Disconnected) => *rx_is_closed = true, + // There are no items in queue. + Err(TryRecvError::Empty) => break, + } + } +}