diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cfe45b5..37321ac8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Greatly improved the speed of graceful shutdown (`^C`) ([PR 938](https://github.com/teloxide/teloxide/pull/938)) - Fix typos in docstrings ([PR 953](https://github.com/teloxide/teloxide/pull/953)) - Use `Seconds` instead of `String` in `InlineQueryResultAudio` for `audio_duration` ([PR 994](https://github.com/teloxide/teloxide/pull/994)) +- High CPU usage on network errors ([PR 1002](https://github.com/teloxide/teloxide/pull/1002), [Issue 780](https://github.com/teloxide/teloxide/issues/780)) ### Changed diff --git a/crates/teloxide/src/backoff.rs b/crates/teloxide/src/backoff.rs new file mode 100644 index 00000000..26819a1d --- /dev/null +++ b/crates/teloxide/src/backoff.rs @@ -0,0 +1,15 @@ +use std::time::Duration; + +pub type BackoffStrategy = Box Duration>; + +/// Calculates the backoff time in seconds for exponential strategy with base 2 +/// +/// The maximum duration is limited to a little less than half an hour (1024 +/// secs), so the successive timings are(in secs): 1, 2, 4, .., 1024, 1024, .. +/// +/// More at: +pub fn exponential_backoff_strategy(error_count: u32) -> Duration { + // The error_count has to be limited so as not to cause overflow: 2^10 = 1024 ~ + // a little less than half an hour + Duration::from_secs(1_u64 << error_count.min(10)) +} diff --git a/crates/teloxide/src/lib.rs b/crates/teloxide/src/lib.rs index 836138c2..4535d843 100644 --- a/crates/teloxide/src/lib.rs +++ b/crates/teloxide/src/lib.rs @@ -135,6 +135,7 @@ pub use repls::{repl, repl_with_listener}; #[allow(deprecated)] pub use repls::{commands_repl, commands_repl_with_listener}; +pub mod backoff; pub mod dispatching; pub mod error_handlers; pub mod prelude; diff --git a/crates/teloxide/src/update_listeners/polling.rs b/crates/teloxide/src/update_listeners/polling.rs index 62d63076..e39ba977 100644 --- a/crates/teloxide/src/update_listeners/polling.rs +++ b/crates/teloxide/src/update_listeners/polling.rs @@ -12,8 +12,10 @@ use std::{ }; use futures::{ready, stream::Stream}; +use tokio::time::{sleep, Sleep}; use crate::{ + backoff::{exponential_backoff_strategy, BackoffStrategy}, requests::{HasPayload, Request, Requester}, stop::{mk_stop_token, StopFlag, StopToken}, types::{AllowedUpdate, Update}, @@ -31,6 +33,7 @@ pub struct PollingBuilder { pub limit: Option, pub allowed_updates: Option>, pub drop_pending_updates: bool, + pub backoff_strategy: BackoffStrategy, } impl PollingBuilder @@ -84,6 +87,17 @@ where Self { drop_pending_updates: true, ..self } } + /// The backoff strategy that will be used for delay calculation between + /// reconnections caused by network errors. + /// + /// By default, the [`exponential_backoff_strategy`] is used. + pub fn backoff_strategy( + self, + backoff_strategy: impl 'static + Send + Fn(u32) -> Duration, + ) -> Self { + Self { backoff_strategy: Box::new(backoff_strategy), ..self } + } + /// Deletes webhook if it was set up. pub async fn delete_webhook(self) -> Self { delete_webhook_if_setup(&self.bot).await; @@ -96,7 +110,8 @@ where /// /// See also: [`polling_default`], [`Polling`]. pub fn build(self) -> Polling { - let Self { bot, timeout, limit, allowed_updates, drop_pending_updates } = self; + let Self { bot, timeout, limit, allowed_updates, drop_pending_updates, backoff_strategy } = + self; let (token, flag) = mk_stop_token(); let polling = Polling { bot, @@ -107,6 +122,7 @@ where flag: Some(flag), token, stop_token_cloned: false, + backoff_strategy, }; assert_update_listener(polling) @@ -252,6 +268,7 @@ pub struct Polling { flag: Option, token: StopToken, stop_token_cloned: bool, + backoff_strategy: BackoffStrategy, } impl Polling @@ -270,6 +287,7 @@ where limit: None, allowed_updates: None, drop_pending_updates: false, + backoff_strategy: Box::new(exponential_backoff_strategy), } } @@ -317,6 +335,14 @@ pub struct PollingStream<'a, B: Requester> { /// The flag that notifies polling to stop polling. #[pin] flag: StopFlag, + + /// How long it takes to make next reconnection attempt + #[pin] + eepy: Option, + + /// Counter for network errors occured during the current series of + /// reconnections + error_count: u32, } impl UpdateListener for Polling { @@ -369,6 +395,8 @@ impl<'a, B: Requester + Send + 'a> AsUpdateStream<'a> for Polling { buffer: Vec::new().into_iter(), in_flight: None, flag, + eepy: None, + error_count: 0, } } } @@ -415,6 +443,9 @@ impl Stream for PollingStream<'_, B> { return Ready(Some(Err(err))); } Ok(updates) => { + // Once we got the update hense the backoff reconnection strategy worked + *this.error_count = 0; + if let Some(upd) = updates.last() { *this.offset = upd.id.as_offset(); } @@ -424,9 +455,24 @@ impl Stream for PollingStream<'_, B> { true => *this.drop_pending_updates = false, } } - Err(err) => return Ready(Some(Err(err))), + Err(err) => { + // Prevents the CPU spike occuring at network connection lose: + let backoff_strategy = &this.polling.backoff_strategy; + this.eepy.set(Some(sleep(backoff_strategy(*this.error_count)))); + log::trace!("set {:?} reconnection delay", backoff_strategy(*this.error_count)); + return Ready(Some(Err(err))); + } } } + // Poll eepy future until completion, needed for backoff strategy + else if let Some(eepy) = this.eepy.as_mut().as_pin_mut() { + ready!(eepy.poll(cx)); + // As soon as delay is waited we increment the counter + *this.error_count = this.error_count.saturating_add(1); + log::trace!("current error count: {}", *this.error_count); + log::trace!("backoff delay completed"); + this.eepy.as_mut().set(None); + } let (offset, limit, timeout) = match (this.stopping, this.drop_pending_updates) { // Normal `get_updates()` call