From ad303ef130b2818dce74229d9dcfa04a42208d86 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Thu, 11 Jan 2024 01:27:49 +0100 Subject: [PATCH] Slightly simplify `Polling` listener by moving update dropping --- .../teloxide/src/update_listeners/polling.rs | 26 +++++++------------ 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/crates/teloxide/src/update_listeners/polling.rs b/crates/teloxide/src/update_listeners/polling.rs index a9e9837c..18a62b49 100644 --- a/crates/teloxide/src/update_listeners/polling.rs +++ b/crates/teloxide/src/update_listeners/polling.rs @@ -13,6 +13,7 @@ use std::{ use futures::{ready, stream::Stream}; use crate::{ + payloads::GetUpdatesSetters as _, requests::{HasPayload, Request, Requester}, stop::{mk_stop_token, StopFlag, StopToken}, types::{AllowedUpdate, Update}, @@ -286,9 +287,6 @@ pub struct PollingStream<'a, B: Requester> { /// Parent structure polling: &'a mut Polling, - /// Whatever to drop pending updates or not. - drop_pending_updates: bool, - /// Timeout parameter for normal `get_updates()` calls. timeout: Option, /// Allowed updates parameter for the first `get_updates()` call. @@ -324,19 +322,19 @@ impl UpdateListener for Polling { Box::pin(async { let timeout = self.timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")); let allowed_updates = self.allowed_updates.clone(); - let drop_pending_updates = self.drop_pending_updates; // FIXME: document that `listen` is a destructive operation, actually, // and you need to call `stop_token` *again* after it self.reinit_stop_flag_if_needed(); - // FIXME: do update dropping *here* + if self.drop_pending_updates { + self.bot.get_updates().offset(-1).limit(1).timeout(0).await?; + } // Unwrap: just called reinit let flag = self.flag.take().unwrap(); let stream = PollingStream { polling: self, - drop_pending_updates, timeout, allowed_updates, offset: 0, @@ -409,29 +407,23 @@ impl Stream for PollingStream<'_, B> { *this.offset = upd.id.as_offset(); } - match *this.drop_pending_updates { - false => *this.buffer = updates.into_iter(), - true => *this.drop_pending_updates = false, - } + *this.buffer = updates.into_iter(); } Err(err) => return Ready(Some(Err(err))), } } - let (offset, limit, timeout) = match (this.stopping, this.drop_pending_updates) { + let (offset, limit, timeout) = match this.stopping { // Normal `get_updates()` call - (false, false) => (*this.offset, this.polling.limit, *this.timeout), - // Graceful shutdown `get_updates()` call (shutdown takes priority over dropping pending - // updates) + false => (*this.offset, this.polling.limit, *this.timeout), + // Graceful shutdown `get_updates()` call // // When stopping we set `timeout = 0` and `limit = 1` so that `get_updates()` // set last seen update (offset) and return immediately - (true, _) => { + true => { log::trace!("graceful shutdown `get_updates` call"); (*this.offset, Some(1), Some(0)) } - // Drop pending updates - (_, true) => (-1, Some(1), Some(0)), }; let req = this