From cdca875b63d37bf0009c5ec09e951c6043ee9e17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A1=D1=8B=D1=80=D1=86=D0=B5=D0=B2=20=D0=92=D0=B0=D0=B4?= =?UTF-8?q?=D0=B8=D0=BC=20=D0=98=D0=B3=D0=BE=D1=80=D0=B5=D0=B2=D0=B8=D1=87?= Date: Thu, 1 Feb 2024 10:35:10 +0300 Subject: [PATCH] Fix backoff bugs --- .../src/{backoff_strategy.rs => backoff.rs} | 7 +++++- crates/teloxide/src/lib.rs | 2 +- .../teloxide/src/update_listeners/polling.rs | 25 +++++++++---------- 3 files changed, 19 insertions(+), 15 deletions(-) rename crates/teloxide/src/{backoff_strategy.rs => backoff.rs} (50%) diff --git a/crates/teloxide/src/backoff_strategy.rs b/crates/teloxide/src/backoff.rs similarity index 50% rename from crates/teloxide/src/backoff_strategy.rs rename to crates/teloxide/src/backoff.rs index b4541d6a..26819a1d 100644 --- a/crates/teloxide/src/backoff_strategy.rs +++ b/crates/teloxide/src/backoff.rs @@ -4,7 +4,12 @@ pub type BackoffStrategy = Box Duration>; /// Calculates the backoff time in seconds for exponential strategy with base 2 /// +/// The maximum duration is limited to a little less than half an hour (1024 +/// secs), so the successive timings are(in secs): 1, 2, 4, .., 1024, 1024, .. +/// /// More at: pub fn exponential_backoff_strategy(error_count: u32) -> Duration { - Duration::from_secs((1_u64 << error_count).min(30 * 60)) + // The error_count has to be limited so as not to cause overflow: 2^10 = 1024 ~ + // a little less than half an hour + Duration::from_secs(1_u64 << error_count.min(10)) } diff --git a/crates/teloxide/src/lib.rs b/crates/teloxide/src/lib.rs index 2ed44c89..4535d843 100644 --- a/crates/teloxide/src/lib.rs +++ b/crates/teloxide/src/lib.rs @@ -135,7 +135,7 @@ pub use repls::{repl, repl_with_listener}; #[allow(deprecated)] pub use repls::{commands_repl, commands_repl_with_listener}; -pub mod backoff_strategy; +pub mod backoff; pub mod dispatching; pub mod error_handlers; pub mod prelude; diff --git a/crates/teloxide/src/update_listeners/polling.rs b/crates/teloxide/src/update_listeners/polling.rs index 4d7e55a9..e39ba977 100644 --- a/crates/teloxide/src/update_listeners/polling.rs +++ b/crates/teloxide/src/update_listeners/polling.rs @@ -15,7 +15,7 @@ use futures::{ready, stream::Stream}; use tokio::time::{sleep, Sleep}; use crate::{ - backoff_strategy::{exponential_backoff_strategy, BackoffStrategy}, + backoff::{exponential_backoff_strategy, BackoffStrategy}, requests::{HasPayload, Request, Requester}, stop::{mk_stop_token, StopFlag, StopToken}, types::{AllowedUpdate, Update}, @@ -91,8 +91,11 @@ where /// reconnections caused by network errors. /// /// By default, the [`exponential_backoff_strategy`] is used. - pub fn backoff_strategy(self, backoff_strategy: BackoffStrategy) -> Self { - Self { backoff_strategy, ..self } + pub fn backoff_strategy( + self, + backoff_strategy: impl 'static + Send + Fn(u32) -> Duration, + ) -> Self { + Self { backoff_strategy: Box::new(backoff_strategy), ..self } } /// Deletes webhook if it was set up. @@ -463,16 +466,12 @@ impl Stream for PollingStream<'_, B> { } // Poll eepy future until completion, needed for backoff strategy else if let Some(eepy) = this.eepy.as_mut().as_pin_mut() { - match eepy.poll(cx) { - Poll::Ready(_) => { - // As soon as delay is waited we increment the counter - *this.error_count = this.error_count.saturating_add(1); - log::trace!("current error count: {}", *this.error_count); - log::trace!("backoff delay completed"); - this.eepy.set(None); - } - Poll::Pending => return Poll::Pending, - } + ready!(eepy.poll(cx)); + // As soon as delay is waited we increment the counter + *this.error_count = this.error_count.saturating_add(1); + log::trace!("current error count: {}", *this.error_count); + log::trace!("backoff delay completed"); + this.eepy.as_mut().set(None); } let (offset, limit, timeout) = match (this.stopping, this.drop_pending_updates) {