From f9da86f88138738c0708cba0b4dd4c0163e294d3 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 27 Jun 2022 02:58:48 +0400 Subject: [PATCH] Add option to drop pending updates w/ polling Former-commit-id: 612f47d242f89ffb256ed387ca2253231f8693fc --- src/dispatching/update_listeners/polling.rs | 50 ++++++++++++++++----- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index 1fc6273f..e3ad7ab5 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -27,6 +27,7 @@ pub struct PollingBuilder { timeout: Option, limit: Option, allowed_updates: Option>, + drop_pending_updates: bool, } impl PollingBuilder @@ -74,6 +75,11 @@ where Self { allowed_updates: Some(allowed_updates), ..self } } + /// Drops pending updates. + pub fn drop_pending_updates(self) -> Self { + Self { drop_pending_updates: true, ..self } + } + /// Deletes webhook if it was set up. pub async fn delete_webhook(self) -> Self { delete_webhook_if_setup(&self.bot).await; @@ -86,9 +92,9 @@ where /// /// See also: [`polling_default`], [`Polling`]. pub fn build(self) -> Polling { - let Self { bot, timeout, limit, allowed_updates } = self; + let Self { bot, timeout, limit, allowed_updates, drop_pending_updates } = self; let (token, flag) = AsyncStopToken::new_pair(); - Polling { bot, timeout, limit, allowed_updates, flag, token } + Polling { bot, timeout, limit, allowed_updates, drop_pending_updates, flag, token } } } @@ -98,7 +104,13 @@ where R: Requester + Send + 'static, ::GetUpdates: Send, { - PollingBuilder { bot, timeout: None, limit: None, allowed_updates: None } + PollingBuilder { + bot, + timeout: None, + limit: None, + allowed_updates: None, + drop_pending_updates: false, + } } /// Returns a long polling update listener with `timeout` of 10 seconds. @@ -232,6 +244,7 @@ pub struct Polling { timeout: Option, limit: Option, allowed_updates: Option>, + drop_pending_updates: bool, flag: AsyncStopFlag, token: AsyncStopToken, } @@ -241,6 +254,9 @@ pub struct PollingStream<'a, B: Requester> { /// Parent structure polling: &'a mut Polling, + /// Whatever to drop pending updates or not. + drop_pending_updates: bool, + /// Timeout parameter for normal `get_updates()` calls. timeout: Option, /// Allowed updates parameter for the first `get_updates()` call. @@ -285,8 +301,10 @@ impl<'a, B: Requester + Send + 'a> AsUpdateStream<'a, B::Err> for Polling { fn as_stream(&'a mut self) -> Self::Stream { let timeout = self.timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")); let allowed_updates = self.allowed_updates.clone(); + let drop_pending_updates = self.drop_pending_updates; PollingStream { polling: self, + drop_pending_updates, timeout, allowed_updates, offset: 0, @@ -326,7 +344,10 @@ impl Stream for PollingStream<'_, B> { *this.offset = upd.id + 1; } - *this.buffer = updates.into_iter(); + match *this.drop_pending_updates { + false => *this.buffer = updates.into_iter(), + true => *this.drop_pending_updates = false, + } } Err(err) => return Ready(Some(Err(err))), } @@ -337,13 +358,18 @@ impl Stream for PollingStream<'_, B> { return Ready(Some(Ok(upd))); } - // When stopping we set `timeout = 0` and `limit = 1` so that `get_updates()` - // set last seen update (offset) and return immediately - let (timeout, limit) = if this.polling.flag.is_stopped() { - *this.stopping = true; - (Some(0), Some(1)) - } else { - (*this.timeout, this.polling.limit) + *this.stopping = this.polling.flag.is_stopped(); + let (offset, limit, timeout) = match (this.stopping, this.drop_pending_updates) { + // Normal `get_updates()` call + (false, false) => (*this.offset, this.polling.limit, *this.timeout), + // Graceful shutdown `get_updates()` call (shutdown takes priority over dropping pending + // updates) + // + // When stopping we set `timeout = 0` and `limit = 1` so that `get_updates()` + // set last seen update (offset) and return immediately + (true, _) => (*this.offset, Some(1), Some(0)), + // Drop pending updates + (_, true) => (-1, Some(1), Some(0)), }; let req = this @@ -351,7 +377,7 @@ impl Stream for PollingStream<'_, B> { .bot .get_updates() .with_payload_mut(|pay| { - pay.offset = Some(*this.offset); + pay.offset = Some(offset); pay.timeout = timeout; pay.limit = limit; pay.allowed_updates = this.allowed_updates.take();