diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b9af44c..86a7ccae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `impl Clone` for {`CacheMe`, `DefaultParseMode`, `Throttle`} ([#76][pr76]) - `DefaultParseMode::parse_mode` which allows to get currently used default parse mode ([#77][pr77]) -- `Thrrotle::{limits,set_limits,with_queue_full_fn,spawn_with_queue_full_fn}` functions ([#77][pr77]) +- `Thrrotle::{limits,set_limits}` functions ([#77][pr77]) +- `Throttle::{with_settings,spawn_with_settings}` and `throttle::Settings` ([#96][pr96]) - Getters for fields nested in `Chat` ([#80][pr80]) - API errors: `ApiError::NotEnoughRightsToManagePins`, `ApiError::BotKickedFromSupergroup` ([#84][pr84]) - Telegram bot API 5.2 support ([#86][pr86]) @@ -24,6 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 [pr84]: https://github.com/teloxide/teloxide-core/pull/84 [pr86]: https://github.com/teloxide/teloxide-core/pull/86 [pr90]: https://github.com/teloxide/teloxide-core/pull/90 +[pr96]: https://github.com/teloxide/teloxide-core/pull/96 ### Changed diff --git a/src/adaptors/throttle.rs b/src/adaptors/throttle.rs index 36005e49..7a43f255 100644 --- a/src/adaptors/throttle.rs +++ b/src/adaptors/throttle.rs @@ -7,6 +7,7 @@ use std::{ }; use futures::{ + future::ready, task::{Context, Poll}, FutureExt, }; @@ -75,30 +76,22 @@ impl Throttle { /// Note: [`Throttle`] will only send requests if returned worker is /// polled/spawned/awaited. pub fn new(bot: B, limits: Limits) -> (Self, impl Future) { - Self::with_queue_full_fn(bot, limits, |pending| async move { - log::warn!("Throttle queue is full ({} pending requests)", pending); - }) + let settings = Settings { + limits, + ..<_>::default() + }; + Self::with_settings(bot, settings) } /// Creates new [`Throttle`] alongside with worker future. /// - /// `queue_full` function is called when internal queue is full. - /// /// Note: [`Throttle`] will only send requests if returned worker is /// polled/spawned/awaited. - pub fn with_queue_full_fn( - bot: B, - limits: Limits, - queue_full: F, - ) -> (Self, impl Future) - where - F: Fn(usize) -> Fut, - Fut: Future + Send + 'static, - { - let (tx, rx) = mpsc::channel(limits.messages_per_sec_overall as usize); + pub fn with_settings(bot: B, settings: Settings) -> (Self, impl Future) { + let (tx, rx) = mpsc::channel(settings.limits.messages_per_sec_overall as usize); let (info_tx, info_rx) = mpsc::channel(2); - let worker = worker(limits, rx, info_rx, queue_full); + let worker = worker(settings, rx, info_rx); let this = Self { bot, queue: tx, @@ -114,16 +107,18 @@ impl Throttle { /// /// [`RequesterExt::throttle`]: crate::requests::RequesterExt::throttle pub fn new_spawn(bot: B, limits: Limits) -> Self { - // new/with_queue_full_fn copypasted here to avoid [rust-lang/#76882] + // new/with_settings copypasted here to avoid [rust-lang/#76882] // // [rust-lang/#76882]: https://github.com/rust-lang/rust/issues/76882 let (tx, rx) = mpsc::channel(limits.messages_per_sec_overall as usize); let (info_tx, info_rx) = mpsc::channel(2); - let worker = worker(limits, rx, info_rx, |pending| async move { - log::warn!("Throttle queue is full ({} pending requests)", pending); - }); + let settings = Settings { + limits, + ..<_>::default() + }; + let worker = worker(settings, rx, info_rx); let this = Self { bot, queue: tx, @@ -135,21 +130,15 @@ impl Throttle { } /// Creates new [`Throttle`] spawning the worker with `tokio::spawn` - /// - /// `queue_full` function is called when internal queue is full. - pub fn spawn_with_queue_full_fn(bot: B, limits: Limits, queue_full: F) -> Self - where - F: Fn(usize) -> Fut + Send + 'static, - Fut: Future + Send + 'static, - { - // with_queue_full_fn copypasted here to avoid [rust-lang/#76882] + pub fn spawn_with_settings(bot: B, settings: Settings) -> Self { + // with_settings copypasted here to avoid [rust-lang/#76882] // // [rust-lang/#76882]: https://github.com/rust-lang/rust/issues/76882 - let (tx, rx) = mpsc::channel(limits.messages_per_sec_overall as usize); + let (tx, rx) = mpsc::channel(settings.limits.messages_per_sec_overall as usize); let (info_tx, info_rx) = mpsc::channel(2); - let worker = worker(limits, rx, info_rx, queue_full); + let worker = worker(settings, rx, info_rx); let this = Self { bot, queue: tx, @@ -197,8 +186,6 @@ impl Throttle { } } -const WORKER_DIED: &str = "worker died before last `Throttle` instance"; - /// Telegram request limits. /// /// This struct is used in [`Throttle`]. @@ -232,6 +219,58 @@ impl Default for Limits { } } +/// Settings used by [`Throttle`] adaptor. +/// +/// ## Examples +/// +/// ``` +/// use teloxide_core::adaptors::throttle; +/// +/// let settings = throttle::Settings::default() +/// .on_queue_full(|pending| async move { /* do something when internal queue is full */ }); +/// // use settings in `Throttle::with_settings` or other constructors +/// # let _ = settings; +/// ``` +#[non_exhaustive] +pub struct Settings { + pub limits: Limits, + pub on_queue_full: BoxedFnMut, +} + +impl Settings { + pub fn limits(mut self, val: Limits) -> Self { + self.limits = val; + self + } + + pub fn on_queue_full(mut self, mut val: F) -> Self + where + F: FnMut(usize) -> Fut + Send + 'static, + Fut: Future + Send + 'static, + { + self.on_queue_full = Box::new(move |pending| Box::pin(val(pending))); + self + } +} + +impl Default for Settings { + fn default() -> Self { + Self { + limits: <_>::default(), + on_queue_full: Box::new(|pending| { + log::warn!("Throttle queue is full ({} pending requests)", pending); + Box::pin(ready(())) + }), + } + } +} + +// Required to not trigger `clippy::type-complexity` lint +type BoxedFnMut = Box O + Send>; +type BoxedFuture = Pin + Send>>; + +const WORKER_DIED: &str = "worker died before last `Throttle` instance"; + const MINUTE: Duration = Duration::from_secs(60); const SECOND: Duration = Duration::from_secs(1); @@ -304,15 +343,14 @@ struct RequestsSentToChats { // the request that it can be now executed, increase counts, add record to the // history. -async fn worker( - mut limits: Limits, +async fn worker( + Settings { + mut limits, + mut on_queue_full, + }: Settings, mut rx: mpsc::Receiver<(ChatIdHash, RequestLock)>, mut info_rx: mpsc::Receiver, - queue_full: F, -) where - F: Fn(usize) -> Fut, - Fut: Future + Send + 'static, -{ +) { // FIXME(waffle): Make an research about data structures for this queue. // Currently this is O(n) removing (n = number of elements // stayed), amortized O(1) push (vec+vecrem). @@ -344,7 +382,7 @@ async fn worker( if queue.len() == queue.capacity() && last_queue_full.elapsed() > QUEUE_FULL_DELAY { last_queue_full = Instant::now(); - tokio::spawn(queue_full(queue.len())); + tokio::spawn(on_queue_full(queue.len())); } // _Maybe_ we need to use `spawn_blocking` here, because there is