mirror of
https://github.com/teloxide/teloxide.git
synced 2025-03-26 00:17:08 +01:00
Merge pull request #1002 from syrtcevvi/network_down_cpu_spike_fix
Add exponential backoff strategy to the polling
This commit is contained in:
commit
9c70cc4a0b
4 changed files with 65 additions and 2 deletions
|
@ -51,6 +51,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
- Greatly improved the speed of graceful shutdown (`^C`) ([PR 938](https://github.com/teloxide/teloxide/pull/938))
|
- Greatly improved the speed of graceful shutdown (`^C`) ([PR 938](https://github.com/teloxide/teloxide/pull/938))
|
||||||
- Fix typos in docstrings ([PR 953](https://github.com/teloxide/teloxide/pull/953))
|
- Fix typos in docstrings ([PR 953](https://github.com/teloxide/teloxide/pull/953))
|
||||||
- Use `Seconds` instead of `String` in `InlineQueryResultAudio` for `audio_duration` ([PR 994](https://github.com/teloxide/teloxide/pull/994))
|
- Use `Seconds` instead of `String` in `InlineQueryResultAudio` for `audio_duration` ([PR 994](https://github.com/teloxide/teloxide/pull/994))
|
||||||
|
- High CPU usage on network errors ([PR 1002](https://github.com/teloxide/teloxide/pull/1002), [Issue 780](https://github.com/teloxide/teloxide/issues/780))
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
|
|
||||||
|
|
15
crates/teloxide/src/backoff.rs
Normal file
15
crates/teloxide/src/backoff.rs
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
pub type BackoffStrategy = Box<dyn Send + Fn(u32) -> Duration>;
|
||||||
|
|
||||||
|
/// Calculates the backoff time in seconds for exponential strategy with base 2
|
||||||
|
///
|
||||||
|
/// The maximum duration is limited to a little less than half an hour (1024
|
||||||
|
/// secs), so the successive timings are(in secs): 1, 2, 4, .., 1024, 1024, ..
|
||||||
|
///
|
||||||
|
/// More at: <https://en.wikipedia.org/wiki/Exponential_backoff#Exponential_backoff_algorithm>
|
||||||
|
pub fn exponential_backoff_strategy(error_count: u32) -> Duration {
|
||||||
|
// The error_count has to be limited so as not to cause overflow: 2^10 = 1024 ~
|
||||||
|
// a little less than half an hour
|
||||||
|
Duration::from_secs(1_u64 << error_count.min(10))
|
||||||
|
}
|
|
@ -135,6 +135,7 @@ pub use repls::{repl, repl_with_listener};
|
||||||
#[allow(deprecated)]
|
#[allow(deprecated)]
|
||||||
pub use repls::{commands_repl, commands_repl_with_listener};
|
pub use repls::{commands_repl, commands_repl_with_listener};
|
||||||
|
|
||||||
|
pub mod backoff;
|
||||||
pub mod dispatching;
|
pub mod dispatching;
|
||||||
pub mod error_handlers;
|
pub mod error_handlers;
|
||||||
pub mod prelude;
|
pub mod prelude;
|
||||||
|
|
|
@ -12,8 +12,10 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::{ready, stream::Stream};
|
use futures::{ready, stream::Stream};
|
||||||
|
use tokio::time::{sleep, Sleep};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
backoff::{exponential_backoff_strategy, BackoffStrategy},
|
||||||
requests::{HasPayload, Request, Requester},
|
requests::{HasPayload, Request, Requester},
|
||||||
stop::{mk_stop_token, StopFlag, StopToken},
|
stop::{mk_stop_token, StopFlag, StopToken},
|
||||||
types::{AllowedUpdate, Update},
|
types::{AllowedUpdate, Update},
|
||||||
|
@ -31,6 +33,7 @@ pub struct PollingBuilder<R> {
|
||||||
pub limit: Option<u8>,
|
pub limit: Option<u8>,
|
||||||
pub allowed_updates: Option<Vec<AllowedUpdate>>,
|
pub allowed_updates: Option<Vec<AllowedUpdate>>,
|
||||||
pub drop_pending_updates: bool,
|
pub drop_pending_updates: bool,
|
||||||
|
pub backoff_strategy: BackoffStrategy,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R> PollingBuilder<R>
|
impl<R> PollingBuilder<R>
|
||||||
|
@ -84,6 +87,17 @@ where
|
||||||
Self { drop_pending_updates: true, ..self }
|
Self { drop_pending_updates: true, ..self }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The backoff strategy that will be used for delay calculation between
|
||||||
|
/// reconnections caused by network errors.
|
||||||
|
///
|
||||||
|
/// By default, the [`exponential_backoff_strategy`] is used.
|
||||||
|
pub fn backoff_strategy(
|
||||||
|
self,
|
||||||
|
backoff_strategy: impl 'static + Send + Fn(u32) -> Duration,
|
||||||
|
) -> Self {
|
||||||
|
Self { backoff_strategy: Box::new(backoff_strategy), ..self }
|
||||||
|
}
|
||||||
|
|
||||||
/// Deletes webhook if it was set up.
|
/// Deletes webhook if it was set up.
|
||||||
pub async fn delete_webhook(self) -> Self {
|
pub async fn delete_webhook(self) -> Self {
|
||||||
delete_webhook_if_setup(&self.bot).await;
|
delete_webhook_if_setup(&self.bot).await;
|
||||||
|
@ -96,7 +110,8 @@ where
|
||||||
///
|
///
|
||||||
/// See also: [`polling_default`], [`Polling`].
|
/// See also: [`polling_default`], [`Polling`].
|
||||||
pub fn build(self) -> Polling<R> {
|
pub fn build(self) -> Polling<R> {
|
||||||
let Self { bot, timeout, limit, allowed_updates, drop_pending_updates } = self;
|
let Self { bot, timeout, limit, allowed_updates, drop_pending_updates, backoff_strategy } =
|
||||||
|
self;
|
||||||
let (token, flag) = mk_stop_token();
|
let (token, flag) = mk_stop_token();
|
||||||
let polling = Polling {
|
let polling = Polling {
|
||||||
bot,
|
bot,
|
||||||
|
@ -107,6 +122,7 @@ where
|
||||||
flag: Some(flag),
|
flag: Some(flag),
|
||||||
token,
|
token,
|
||||||
stop_token_cloned: false,
|
stop_token_cloned: false,
|
||||||
|
backoff_strategy,
|
||||||
};
|
};
|
||||||
|
|
||||||
assert_update_listener(polling)
|
assert_update_listener(polling)
|
||||||
|
@ -252,6 +268,7 @@ pub struct Polling<B: Requester> {
|
||||||
flag: Option<StopFlag>,
|
flag: Option<StopFlag>,
|
||||||
token: StopToken,
|
token: StopToken,
|
||||||
stop_token_cloned: bool,
|
stop_token_cloned: bool,
|
||||||
|
backoff_strategy: BackoffStrategy,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R> Polling<R>
|
impl<R> Polling<R>
|
||||||
|
@ -270,6 +287,7 @@ where
|
||||||
limit: None,
|
limit: None,
|
||||||
allowed_updates: None,
|
allowed_updates: None,
|
||||||
drop_pending_updates: false,
|
drop_pending_updates: false,
|
||||||
|
backoff_strategy: Box::new(exponential_backoff_strategy),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -317,6 +335,14 @@ pub struct PollingStream<'a, B: Requester> {
|
||||||
/// The flag that notifies polling to stop polling.
|
/// The flag that notifies polling to stop polling.
|
||||||
#[pin]
|
#[pin]
|
||||||
flag: StopFlag,
|
flag: StopFlag,
|
||||||
|
|
||||||
|
/// How long it takes to make next reconnection attempt
|
||||||
|
#[pin]
|
||||||
|
eepy: Option<Sleep>,
|
||||||
|
|
||||||
|
/// Counter for network errors occured during the current series of
|
||||||
|
/// reconnections
|
||||||
|
error_count: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B: Requester + Send + 'static> UpdateListener for Polling<B> {
|
impl<B: Requester + Send + 'static> UpdateListener for Polling<B> {
|
||||||
|
@ -369,6 +395,8 @@ impl<'a, B: Requester + Send + 'a> AsUpdateStream<'a> for Polling<B> {
|
||||||
buffer: Vec::new().into_iter(),
|
buffer: Vec::new().into_iter(),
|
||||||
in_flight: None,
|
in_flight: None,
|
||||||
flag,
|
flag,
|
||||||
|
eepy: None,
|
||||||
|
error_count: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -415,6 +443,9 @@ impl<B: Requester> Stream for PollingStream<'_, B> {
|
||||||
return Ready(Some(Err(err)));
|
return Ready(Some(Err(err)));
|
||||||
}
|
}
|
||||||
Ok(updates) => {
|
Ok(updates) => {
|
||||||
|
// Once we got the update hense the backoff reconnection strategy worked
|
||||||
|
*this.error_count = 0;
|
||||||
|
|
||||||
if let Some(upd) = updates.last() {
|
if let Some(upd) = updates.last() {
|
||||||
*this.offset = upd.id.as_offset();
|
*this.offset = upd.id.as_offset();
|
||||||
}
|
}
|
||||||
|
@ -424,9 +455,24 @@ impl<B: Requester> Stream for PollingStream<'_, B> {
|
||||||
true => *this.drop_pending_updates = false,
|
true => *this.drop_pending_updates = false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(err) => return Ready(Some(Err(err))),
|
Err(err) => {
|
||||||
|
// Prevents the CPU spike occuring at network connection lose: <https://github.com/teloxide/teloxide/issues/780>
|
||||||
|
let backoff_strategy = &this.polling.backoff_strategy;
|
||||||
|
this.eepy.set(Some(sleep(backoff_strategy(*this.error_count))));
|
||||||
|
log::trace!("set {:?} reconnection delay", backoff_strategy(*this.error_count));
|
||||||
|
return Ready(Some(Err(err)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Poll eepy future until completion, needed for backoff strategy
|
||||||
|
else if let Some(eepy) = this.eepy.as_mut().as_pin_mut() {
|
||||||
|
ready!(eepy.poll(cx));
|
||||||
|
// As soon as delay is waited we increment the counter
|
||||||
|
*this.error_count = this.error_count.saturating_add(1);
|
||||||
|
log::trace!("current error count: {}", *this.error_count);
|
||||||
|
log::trace!("backoff delay completed");
|
||||||
|
this.eepy.as_mut().set(None);
|
||||||
|
}
|
||||||
|
|
||||||
let (offset, limit, timeout) = match (this.stopping, this.drop_pending_updates) {
|
let (offset, limit, timeout) = match (this.stopping, this.drop_pending_updates) {
|
||||||
// Normal `get_updates()` call
|
// Normal `get_updates()` call
|
||||||
|
|
Loading…
Add table
Reference in a new issue