mirror of
https://github.com/teloxide/teloxide.git
synced 2025-01-09 11:43:57 +01:00
Make Throttle
settings more extendable
Add a `#[non_exhaustive]` `Settings` builder struct. Replace `with_queue_full_fn` and `spawn_with_queue_full_fn` with `with_settings` and `spawn_with_settings`. This allows to add more settings in the future without breaking changes.
This commit is contained in:
parent
bd104a0a08
commit
4377abd149
2 changed files with 81 additions and 41 deletions
|
@ -11,7 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
|
|
||||||
- `impl Clone` for {`CacheMe`, `DefaultParseMode`, `Throttle`} ([#76][pr76])
|
- `impl Clone` for {`CacheMe`, `DefaultParseMode`, `Throttle`} ([#76][pr76])
|
||||||
- `DefaultParseMode::parse_mode` which allows to get currently used default parse mode ([#77][pr77])
|
- `DefaultParseMode::parse_mode` which allows to get currently used default parse mode ([#77][pr77])
|
||||||
- `Thrrotle::{limits,set_limits,with_queue_full_fn,spawn_with_queue_full_fn}` functions ([#77][pr77])
|
- `Thrrotle::{limits,set_limits}` functions ([#77][pr77])
|
||||||
|
- `Throttle::{with_settings,spawn_with_settings}` and `throttle::Settings` ([#96][pr96])
|
||||||
- Getters for fields nested in `Chat` ([#80][pr80])
|
- Getters for fields nested in `Chat` ([#80][pr80])
|
||||||
- API errors: `ApiError::NotEnoughRightsToManagePins`, `ApiError::BotKickedFromSupergroup` ([#84][pr84])
|
- API errors: `ApiError::NotEnoughRightsToManagePins`, `ApiError::BotKickedFromSupergroup` ([#84][pr84])
|
||||||
- Telegram bot API 5.2 support ([#86][pr86])
|
- Telegram bot API 5.2 support ([#86][pr86])
|
||||||
|
@ -24,6 +25,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
[pr84]: https://github.com/teloxide/teloxide-core/pull/84
|
[pr84]: https://github.com/teloxide/teloxide-core/pull/84
|
||||||
[pr86]: https://github.com/teloxide/teloxide-core/pull/86
|
[pr86]: https://github.com/teloxide/teloxide-core/pull/86
|
||||||
[pr90]: https://github.com/teloxide/teloxide-core/pull/90
|
[pr90]: https://github.com/teloxide/teloxide-core/pull/90
|
||||||
|
[pr96]: https://github.com/teloxide/teloxide-core/pull/96
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::{
|
use futures::{
|
||||||
|
future::ready,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
FutureExt,
|
FutureExt,
|
||||||
};
|
};
|
||||||
|
@ -75,30 +76,22 @@ impl<B> Throttle<B> {
|
||||||
/// Note: [`Throttle`] will only send requests if returned worker is
|
/// Note: [`Throttle`] will only send requests if returned worker is
|
||||||
/// polled/spawned/awaited.
|
/// polled/spawned/awaited.
|
||||||
pub fn new(bot: B, limits: Limits) -> (Self, impl Future<Output = ()>) {
|
pub fn new(bot: B, limits: Limits) -> (Self, impl Future<Output = ()>) {
|
||||||
Self::with_queue_full_fn(bot, limits, |pending| async move {
|
let settings = Settings {
|
||||||
log::warn!("Throttle queue is full ({} pending requests)", pending);
|
limits,
|
||||||
})
|
..<_>::default()
|
||||||
|
};
|
||||||
|
Self::with_settings(bot, settings)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates new [`Throttle`] alongside with worker future.
|
/// Creates new [`Throttle`] alongside with worker future.
|
||||||
///
|
///
|
||||||
/// `queue_full` function is called when internal queue is full.
|
|
||||||
///
|
|
||||||
/// Note: [`Throttle`] will only send requests if returned worker is
|
/// Note: [`Throttle`] will only send requests if returned worker is
|
||||||
/// polled/spawned/awaited.
|
/// polled/spawned/awaited.
|
||||||
pub fn with_queue_full_fn<F, Fut>(
|
pub fn with_settings(bot: B, settings: Settings) -> (Self, impl Future<Output = ()>) {
|
||||||
bot: B,
|
let (tx, rx) = mpsc::channel(settings.limits.messages_per_sec_overall as usize);
|
||||||
limits: Limits,
|
|
||||||
queue_full: F,
|
|
||||||
) -> (Self, impl Future<Output = ()>)
|
|
||||||
where
|
|
||||||
F: Fn(usize) -> Fut,
|
|
||||||
Fut: Future<Output = ()> + Send + 'static,
|
|
||||||
{
|
|
||||||
let (tx, rx) = mpsc::channel(limits.messages_per_sec_overall as usize);
|
|
||||||
let (info_tx, info_rx) = mpsc::channel(2);
|
let (info_tx, info_rx) = mpsc::channel(2);
|
||||||
|
|
||||||
let worker = worker(limits, rx, info_rx, queue_full);
|
let worker = worker(settings, rx, info_rx);
|
||||||
let this = Self {
|
let this = Self {
|
||||||
bot,
|
bot,
|
||||||
queue: tx,
|
queue: tx,
|
||||||
|
@ -114,16 +107,18 @@ impl<B> Throttle<B> {
|
||||||
///
|
///
|
||||||
/// [`RequesterExt::throttle`]: crate::requests::RequesterExt::throttle
|
/// [`RequesterExt::throttle`]: crate::requests::RequesterExt::throttle
|
||||||
pub fn new_spawn(bot: B, limits: Limits) -> Self {
|
pub fn new_spawn(bot: B, limits: Limits) -> Self {
|
||||||
// new/with_queue_full_fn copypasted here to avoid [rust-lang/#76882]
|
// new/with_settings copypasted here to avoid [rust-lang/#76882]
|
||||||
//
|
//
|
||||||
// [rust-lang/#76882]: https://github.com/rust-lang/rust/issues/76882
|
// [rust-lang/#76882]: https://github.com/rust-lang/rust/issues/76882
|
||||||
|
|
||||||
let (tx, rx) = mpsc::channel(limits.messages_per_sec_overall as usize);
|
let (tx, rx) = mpsc::channel(limits.messages_per_sec_overall as usize);
|
||||||
let (info_tx, info_rx) = mpsc::channel(2);
|
let (info_tx, info_rx) = mpsc::channel(2);
|
||||||
|
|
||||||
let worker = worker(limits, rx, info_rx, |pending| async move {
|
let settings = Settings {
|
||||||
log::warn!("Throttle queue is full ({} pending requests)", pending);
|
limits,
|
||||||
});
|
..<_>::default()
|
||||||
|
};
|
||||||
|
let worker = worker(settings, rx, info_rx);
|
||||||
let this = Self {
|
let this = Self {
|
||||||
bot,
|
bot,
|
||||||
queue: tx,
|
queue: tx,
|
||||||
|
@ -135,21 +130,15 @@ impl<B> Throttle<B> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates new [`Throttle`] spawning the worker with `tokio::spawn`
|
/// Creates new [`Throttle`] spawning the worker with `tokio::spawn`
|
||||||
///
|
pub fn spawn_with_settings(bot: B, settings: Settings) -> Self {
|
||||||
/// `queue_full` function is called when internal queue is full.
|
// with_settings copypasted here to avoid [rust-lang/#76882]
|
||||||
pub fn spawn_with_queue_full_fn<F, Fut>(bot: B, limits: Limits, queue_full: F) -> Self
|
|
||||||
where
|
|
||||||
F: Fn(usize) -> Fut + Send + 'static,
|
|
||||||
Fut: Future<Output = ()> + Send + 'static,
|
|
||||||
{
|
|
||||||
// with_queue_full_fn copypasted here to avoid [rust-lang/#76882]
|
|
||||||
//
|
//
|
||||||
// [rust-lang/#76882]: https://github.com/rust-lang/rust/issues/76882
|
// [rust-lang/#76882]: https://github.com/rust-lang/rust/issues/76882
|
||||||
|
|
||||||
let (tx, rx) = mpsc::channel(limits.messages_per_sec_overall as usize);
|
let (tx, rx) = mpsc::channel(settings.limits.messages_per_sec_overall as usize);
|
||||||
let (info_tx, info_rx) = mpsc::channel(2);
|
let (info_tx, info_rx) = mpsc::channel(2);
|
||||||
|
|
||||||
let worker = worker(limits, rx, info_rx, queue_full);
|
let worker = worker(settings, rx, info_rx);
|
||||||
let this = Self {
|
let this = Self {
|
||||||
bot,
|
bot,
|
||||||
queue: tx,
|
queue: tx,
|
||||||
|
@ -197,8 +186,6 @@ impl<B> Throttle<B> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const WORKER_DIED: &str = "worker died before last `Throttle` instance";
|
|
||||||
|
|
||||||
/// Telegram request limits.
|
/// Telegram request limits.
|
||||||
///
|
///
|
||||||
/// This struct is used in [`Throttle`].
|
/// This struct is used in [`Throttle`].
|
||||||
|
@ -232,6 +219,58 @@ impl Default for Limits {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Settings used by [`Throttle`] adaptor.
|
||||||
|
///
|
||||||
|
/// ## Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use teloxide_core::adaptors::throttle;
|
||||||
|
///
|
||||||
|
/// let settings = throttle::Settings::default()
|
||||||
|
/// .on_queue_full(|pending| async move { /* do something when internal queue is full */ });
|
||||||
|
/// // use settings in `Throttle::with_settings` or other constructors
|
||||||
|
/// # let _ = settings;
|
||||||
|
/// ```
|
||||||
|
#[non_exhaustive]
|
||||||
|
pub struct Settings {
|
||||||
|
pub limits: Limits,
|
||||||
|
pub on_queue_full: BoxedFnMut<usize, BoxedFuture>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Settings {
|
||||||
|
pub fn limits(mut self, val: Limits) -> Self {
|
||||||
|
self.limits = val;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn on_queue_full<F, Fut>(mut self, mut val: F) -> Self
|
||||||
|
where
|
||||||
|
F: FnMut(usize) -> Fut + Send + 'static,
|
||||||
|
Fut: Future<Output = ()> + Send + 'static,
|
||||||
|
{
|
||||||
|
self.on_queue_full = Box::new(move |pending| Box::pin(val(pending)));
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Settings {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
limits: <_>::default(),
|
||||||
|
on_queue_full: Box::new(|pending| {
|
||||||
|
log::warn!("Throttle queue is full ({} pending requests)", pending);
|
||||||
|
Box::pin(ready(()))
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Required to not trigger `clippy::type-complexity` lint
|
||||||
|
type BoxedFnMut<I, O> = Box<dyn FnMut(I) -> O + Send>;
|
||||||
|
type BoxedFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
|
||||||
|
|
||||||
|
const WORKER_DIED: &str = "worker died before last `Throttle` instance";
|
||||||
|
|
||||||
const MINUTE: Duration = Duration::from_secs(60);
|
const MINUTE: Duration = Duration::from_secs(60);
|
||||||
const SECOND: Duration = Duration::from_secs(1);
|
const SECOND: Duration = Duration::from_secs(1);
|
||||||
|
|
||||||
|
@ -304,15 +343,14 @@ struct RequestsSentToChats {
|
||||||
// the request that it can be now executed, increase counts, add record to the
|
// the request that it can be now executed, increase counts, add record to the
|
||||||
// history.
|
// history.
|
||||||
|
|
||||||
async fn worker<F, Fut>(
|
async fn worker(
|
||||||
mut limits: Limits,
|
Settings {
|
||||||
|
mut limits,
|
||||||
|
mut on_queue_full,
|
||||||
|
}: Settings,
|
||||||
mut rx: mpsc::Receiver<(ChatIdHash, RequestLock)>,
|
mut rx: mpsc::Receiver<(ChatIdHash, RequestLock)>,
|
||||||
mut info_rx: mpsc::Receiver<InfoMessage>,
|
mut info_rx: mpsc::Receiver<InfoMessage>,
|
||||||
queue_full: F,
|
) {
|
||||||
) where
|
|
||||||
F: Fn(usize) -> Fut,
|
|
||||||
Fut: Future<Output = ()> + Send + 'static,
|
|
||||||
{
|
|
||||||
// FIXME(waffle): Make an research about data structures for this queue.
|
// FIXME(waffle): Make an research about data structures for this queue.
|
||||||
// Currently this is O(n) removing (n = number of elements
|
// Currently this is O(n) removing (n = number of elements
|
||||||
// stayed), amortized O(1) push (vec+vecrem).
|
// stayed), amortized O(1) push (vec+vecrem).
|
||||||
|
@ -344,7 +382,7 @@ async fn worker<F, Fut>(
|
||||||
|
|
||||||
if queue.len() == queue.capacity() && last_queue_full.elapsed() > QUEUE_FULL_DELAY {
|
if queue.len() == queue.capacity() && last_queue_full.elapsed() > QUEUE_FULL_DELAY {
|
||||||
last_queue_full = Instant::now();
|
last_queue_full = Instant::now();
|
||||||
tokio::spawn(queue_full(queue.len()));
|
tokio::spawn(on_queue_full(queue.len()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// _Maybe_ we need to use `spawn_blocking` here, because there is
|
// _Maybe_ we need to use `spawn_blocking` here, because there is
|
||||||
|
|
Loading…
Reference in a new issue