Fix backoff bugs

This commit is contained in:
Сырцев Вадим Игоревич 2024-02-01 10:35:10 +03:00
parent 58f8a35825
commit cdca875b63
3 changed files with 19 additions and 15 deletions

View file

@ -4,7 +4,12 @@ pub type BackoffStrategy = Box<dyn Send + Fn(u32) -> Duration>;
/// Calculates the backoff time in seconds for exponential strategy with base 2
///
/// The maximum duration is limited to a little less than half an hour (1024
/// secs), so the successive timings are(in secs): 1, 2, 4, .., 1024, 1024, ..
///
/// More at: <https://en.wikipedia.org/wiki/Exponential_backoff#Exponential_backoff_algorithm>
pub fn exponential_backoff_strategy(error_count: u32) -> Duration {
Duration::from_secs((1_u64 << error_count).min(30 * 60))
// The error_count has to be limited so as not to cause overflow: 2^10 = 1024 ~
// a little less than half an hour
Duration::from_secs(1_u64 << error_count.min(10))
}

View file

@ -135,7 +135,7 @@ pub use repls::{repl, repl_with_listener};
#[allow(deprecated)]
pub use repls::{commands_repl, commands_repl_with_listener};
pub mod backoff_strategy;
pub mod backoff;
pub mod dispatching;
pub mod error_handlers;
pub mod prelude;

View file

@ -15,7 +15,7 @@ use futures::{ready, stream::Stream};
use tokio::time::{sleep, Sleep};
use crate::{
backoff_strategy::{exponential_backoff_strategy, BackoffStrategy},
backoff::{exponential_backoff_strategy, BackoffStrategy},
requests::{HasPayload, Request, Requester},
stop::{mk_stop_token, StopFlag, StopToken},
types::{AllowedUpdate, Update},
@ -91,8 +91,11 @@ where
/// reconnections caused by network errors.
///
/// By default, the [`exponential_backoff_strategy`] is used.
pub fn backoff_strategy(self, backoff_strategy: BackoffStrategy) -> Self {
Self { backoff_strategy, ..self }
pub fn backoff_strategy(
self,
backoff_strategy: impl 'static + Send + Fn(u32) -> Duration,
) -> Self {
Self { backoff_strategy: Box::new(backoff_strategy), ..self }
}
/// Deletes webhook if it was set up.
@ -463,16 +466,12 @@ impl<B: Requester> Stream for PollingStream<'_, B> {
}
// Poll eepy future until completion, needed for backoff strategy
else if let Some(eepy) = this.eepy.as_mut().as_pin_mut() {
match eepy.poll(cx) {
Poll::Ready(_) => {
// As soon as delay is waited we increment the counter
*this.error_count = this.error_count.saturating_add(1);
log::trace!("current error count: {}", *this.error_count);
log::trace!("backoff delay completed");
this.eepy.set(None);
}
Poll::Pending => return Poll::Pending,
}
ready!(eepy.poll(cx));
// As soon as delay is waited we increment the counter
*this.error_count = this.error_count.saturating_add(1);
log::trace!("current error count: {}", *this.error_count);
log::trace!("backoff delay completed");
this.eepy.as_mut().set(None);
}
let (offset, limit, timeout) = match (this.stopping, this.drop_pending_updates) {