Add option to drop pending updates w/ polling

Former-commit-id: 612f47d242
This commit is contained in:
Maybe Waffle 2022-06-27 02:58:48 +04:00
parent 60fc833108
commit f9da86f881

View file

@ -27,6 +27,7 @@ pub struct PollingBuilder<R> {
timeout: Option<Duration>, timeout: Option<Duration>,
limit: Option<u8>, limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>, allowed_updates: Option<Vec<AllowedUpdate>>,
drop_pending_updates: bool,
} }
impl<R> PollingBuilder<R> impl<R> PollingBuilder<R>
@ -74,6 +75,11 @@ where
Self { allowed_updates: Some(allowed_updates), ..self } Self { allowed_updates: Some(allowed_updates), ..self }
} }
/// Drops pending updates.
pub fn drop_pending_updates(self) -> Self {
Self { drop_pending_updates: true, ..self }
}
/// Deletes webhook if it was set up. /// Deletes webhook if it was set up.
pub async fn delete_webhook(self) -> Self { pub async fn delete_webhook(self) -> Self {
delete_webhook_if_setup(&self.bot).await; delete_webhook_if_setup(&self.bot).await;
@ -86,9 +92,9 @@ where
/// ///
/// See also: [`polling_default`], [`Polling`]. /// See also: [`polling_default`], [`Polling`].
pub fn build(self) -> Polling<R> { pub fn build(self) -> Polling<R> {
let Self { bot, timeout, limit, allowed_updates } = self; let Self { bot, timeout, limit, allowed_updates, drop_pending_updates } = self;
let (token, flag) = AsyncStopToken::new_pair(); let (token, flag) = AsyncStopToken::new_pair();
Polling { bot, timeout, limit, allowed_updates, flag, token } Polling { bot, timeout, limit, allowed_updates, drop_pending_updates, flag, token }
} }
} }
@ -98,7 +104,13 @@ where
R: Requester + Send + 'static, R: Requester + Send + 'static,
<R as Requester>::GetUpdates: Send, <R as Requester>::GetUpdates: Send,
{ {
PollingBuilder { bot, timeout: None, limit: None, allowed_updates: None } PollingBuilder {
bot,
timeout: None,
limit: None,
allowed_updates: None,
drop_pending_updates: false,
}
} }
/// Returns a long polling update listener with `timeout` of 10 seconds. /// Returns a long polling update listener with `timeout` of 10 seconds.
@ -232,6 +244,7 @@ pub struct Polling<B: Requester> {
timeout: Option<Duration>, timeout: Option<Duration>,
limit: Option<u8>, limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>, allowed_updates: Option<Vec<AllowedUpdate>>,
drop_pending_updates: bool,
flag: AsyncStopFlag, flag: AsyncStopFlag,
token: AsyncStopToken, token: AsyncStopToken,
} }
@ -241,6 +254,9 @@ pub struct PollingStream<'a, B: Requester> {
/// Parent structure /// Parent structure
polling: &'a mut Polling<B>, polling: &'a mut Polling<B>,
/// Whatever to drop pending updates or not.
drop_pending_updates: bool,
/// Timeout parameter for normal `get_updates()` calls. /// Timeout parameter for normal `get_updates()` calls.
timeout: Option<u32>, timeout: Option<u32>,
/// Allowed updates parameter for the first `get_updates()` call. /// Allowed updates parameter for the first `get_updates()` call.
@ -285,8 +301,10 @@ impl<'a, B: Requester + Send + 'a> AsUpdateStream<'a, B::Err> for Polling<B> {
fn as_stream(&'a mut self) -> Self::Stream { fn as_stream(&'a mut self) -> Self::Stream {
let timeout = self.timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")); let timeout = self.timeout.map(|t| t.as_secs().try_into().expect("timeout is too big"));
let allowed_updates = self.allowed_updates.clone(); let allowed_updates = self.allowed_updates.clone();
let drop_pending_updates = self.drop_pending_updates;
PollingStream { PollingStream {
polling: self, polling: self,
drop_pending_updates,
timeout, timeout,
allowed_updates, allowed_updates,
offset: 0, offset: 0,
@ -326,7 +344,10 @@ impl<B: Requester> Stream for PollingStream<'_, B> {
*this.offset = upd.id + 1; *this.offset = upd.id + 1;
} }
*this.buffer = updates.into_iter(); match *this.drop_pending_updates {
false => *this.buffer = updates.into_iter(),
true => *this.drop_pending_updates = false,
}
} }
Err(err) => return Ready(Some(Err(err))), Err(err) => return Ready(Some(Err(err))),
} }
@ -337,13 +358,18 @@ impl<B: Requester> Stream for PollingStream<'_, B> {
return Ready(Some(Ok(upd))); return Ready(Some(Ok(upd)));
} }
// When stopping we set `timeout = 0` and `limit = 1` so that `get_updates()` *this.stopping = this.polling.flag.is_stopped();
// set last seen update (offset) and return immediately let (offset, limit, timeout) = match (this.stopping, this.drop_pending_updates) {
let (timeout, limit) = if this.polling.flag.is_stopped() { // Normal `get_updates()` call
*this.stopping = true; (false, false) => (*this.offset, this.polling.limit, *this.timeout),
(Some(0), Some(1)) // Graceful shutdown `get_updates()` call (shutdown takes priority over dropping pending
} else { // updates)
(*this.timeout, this.polling.limit) //
// When stopping we set `timeout = 0` and `limit = 1` so that `get_updates()`
// set last seen update (offset) and return immediately
(true, _) => (*this.offset, Some(1), Some(0)),
// Drop pending updates
(_, true) => (-1, Some(1), Some(0)),
}; };
let req = this let req = this
@ -351,7 +377,7 @@ impl<B: Requester> Stream for PollingStream<'_, B> {
.bot .bot
.get_updates() .get_updates()
.with_payload_mut(|pay| { .with_payload_mut(|pay| {
pay.offset = Some(*this.offset); pay.offset = Some(offset);
pay.timeout = timeout; pay.timeout = timeout;
pay.limit = limit; pay.limit = limit;
pay.allowed_updates = this.allowed_updates.take(); pay.allowed_updates = this.allowed_updates.take();