Add exponential backoff strategy to the polling

This commit is contained in:
Сырцев Вадим Игоревич 2024-01-31 22:56:08 +03:00
parent 2945f4d301
commit b6e9dc8a81
3 changed files with 60 additions and 2 deletions

View file

@ -0,0 +1,10 @@
use std::time::Duration;
pub type BackoffStrategy = Box<dyn Fn(u32) -> Duration + Send>;
/// Calculates the backoff time in seconds for exponential strategy with base 2
///
/// More at: <https://en.wikipedia.org/wiki/Exponential_backoff#Exponential_backoff_algorithm>
pub fn exponential_backoff_strategy(error_count: u32) -> Duration {
Duration::from_secs(2_u64.pow(error_count))
}

View file

@ -135,6 +135,7 @@ pub use repls::{repl, repl_with_listener};
#[allow(deprecated)] #[allow(deprecated)]
pub use repls::{commands_repl, commands_repl_with_listener}; pub use repls::{commands_repl, commands_repl_with_listener};
pub mod backoff_strategy;
pub mod dispatching; pub mod dispatching;
pub mod error_handlers; pub mod error_handlers;
pub mod prelude; pub mod prelude;

View file

@ -12,8 +12,10 @@ use std::{
}; };
use futures::{ready, stream::Stream}; use futures::{ready, stream::Stream};
use tokio::time::{sleep, Sleep};
use crate::{ use crate::{
backoff_strategy::{exponential_backoff_strategy, BackoffStrategy},
requests::{HasPayload, Request, Requester}, requests::{HasPayload, Request, Requester},
stop::{mk_stop_token, StopFlag, StopToken}, stop::{mk_stop_token, StopFlag, StopToken},
types::{AllowedUpdate, Update}, types::{AllowedUpdate, Update},
@ -31,6 +33,7 @@ pub struct PollingBuilder<R> {
pub limit: Option<u8>, pub limit: Option<u8>,
pub allowed_updates: Option<Vec<AllowedUpdate>>, pub allowed_updates: Option<Vec<AllowedUpdate>>,
pub drop_pending_updates: bool, pub drop_pending_updates: bool,
pub backoff_strategy: BackoffStrategy,
} }
impl<R> PollingBuilder<R> impl<R> PollingBuilder<R>
@ -84,6 +87,14 @@ where
Self { drop_pending_updates: true, ..self } Self { drop_pending_updates: true, ..self }
} }
/// The backoff strategy that will be used for delay calculation between
/// reconnections
///
/// By default, the [`exponential_backoff_strategy`] is used
pub fn backoff_strategy(self, backoff_strategy: BackoffStrategy) -> Self {
Self { backoff_strategy, ..self }
}
/// Deletes webhook if it was set up. /// Deletes webhook if it was set up.
pub async fn delete_webhook(self) -> Self { pub async fn delete_webhook(self) -> Self {
delete_webhook_if_setup(&self.bot).await; delete_webhook_if_setup(&self.bot).await;
@ -96,7 +107,8 @@ where
/// ///
/// See also: [`polling_default`], [`Polling`]. /// See also: [`polling_default`], [`Polling`].
pub fn build(self) -> Polling<R> { pub fn build(self) -> Polling<R> {
let Self { bot, timeout, limit, allowed_updates, drop_pending_updates } = self; let Self { bot, timeout, limit, allowed_updates, drop_pending_updates, backoff_strategy } =
self;
let (token, flag) = mk_stop_token(); let (token, flag) = mk_stop_token();
let polling = Polling { let polling = Polling {
bot, bot,
@ -107,6 +119,7 @@ where
flag: Some(flag), flag: Some(flag),
token, token,
stop_token_cloned: false, stop_token_cloned: false,
backoff_strategy,
}; };
assert_update_listener(polling) assert_update_listener(polling)
@ -252,6 +265,7 @@ pub struct Polling<B: Requester> {
flag: Option<StopFlag>, flag: Option<StopFlag>,
token: StopToken, token: StopToken,
stop_token_cloned: bool, stop_token_cloned: bool,
backoff_strategy: BackoffStrategy,
} }
impl<R> Polling<R> impl<R> Polling<R>
@ -270,6 +284,7 @@ where
limit: None, limit: None,
allowed_updates: None, allowed_updates: None,
drop_pending_updates: false, drop_pending_updates: false,
backoff_strategy: Box::new(exponential_backoff_strategy),
} }
} }
@ -317,6 +332,14 @@ pub struct PollingStream<'a, B: Requester> {
/// The flag that notifies polling to stop polling. /// The flag that notifies polling to stop polling.
#[pin] #[pin]
flag: StopFlag, flag: StopFlag,
/// How long it takes to make next reconnection attempt
#[pin]
eepy: Option<Sleep>,
/// Counter for network errors occured during the current series of
/// reconnections
error_count: u32,
} }
impl<B: Requester + Send + 'static> UpdateListener for Polling<B> { impl<B: Requester + Send + 'static> UpdateListener for Polling<B> {
@ -369,6 +392,8 @@ impl<'a, B: Requester + Send + 'a> AsUpdateStream<'a> for Polling<B> {
buffer: Vec::new().into_iter(), buffer: Vec::new().into_iter(),
in_flight: None, in_flight: None,
flag, flag,
eepy: None,
error_count: 0,
} }
} }
} }
@ -415,6 +440,9 @@ impl<B: Requester> Stream for PollingStream<'_, B> {
return Ready(Some(Err(err))); return Ready(Some(Err(err)));
} }
Ok(updates) => { Ok(updates) => {
// Once we got the update hense the backoff reconnection strategy worked
*this.error_count = 0;
if let Some(upd) = updates.last() { if let Some(upd) = updates.last() {
*this.offset = upd.id.as_offset(); *this.offset = upd.id.as_offset();
} }
@ -424,7 +452,26 @@ impl<B: Requester> Stream for PollingStream<'_, B> {
true => *this.drop_pending_updates = false, true => *this.drop_pending_updates = false,
} }
} }
Err(err) => return Ready(Some(Err(err))), Err(err) => {
// Prevents the CPU spike occuring at network connection lose: <https://github.com/teloxide/teloxide/issues/780>
let backoff_strategy = &this.polling.backoff_strategy;
this.eepy.set(Some(sleep(backoff_strategy(*this.error_count))));
log::trace!("set {:?} reconnection delay", backoff_strategy(*this.error_count));
return Ready(Some(Err(err)));
}
}
}
// Poll eepy future until completion, needed for backoff strategy
if let Some(eepy) = this.eepy.as_mut().as_pin_mut() {
match eepy.poll(cx) {
Poll::Ready(_) => {
// As soon as delay is waited we increment the counter
*this.error_count = this.error_count.saturating_add(1);
log::trace!("current error count: {}", *this.error_count);
log::trace!("backoff delay completed");
this.eepy.set(None);
}
Poll::Pending => return Poll::Pending,
} }
} }