mirror of
https://github.com/teloxide/teloxide.git
synced 2024-12-22 14:35:36 +01:00
Slightly simplify Polling
listener by moving update dropping
This commit is contained in:
parent
d60552a878
commit
ad303ef130
1 changed files with 9 additions and 17 deletions
|
@ -13,6 +13,7 @@ use std::{
|
||||||
use futures::{ready, stream::Stream};
|
use futures::{ready, stream::Stream};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
payloads::GetUpdatesSetters as _,
|
||||||
requests::{HasPayload, Request, Requester},
|
requests::{HasPayload, Request, Requester},
|
||||||
stop::{mk_stop_token, StopFlag, StopToken},
|
stop::{mk_stop_token, StopFlag, StopToken},
|
||||||
types::{AllowedUpdate, Update},
|
types::{AllowedUpdate, Update},
|
||||||
|
@ -286,9 +287,6 @@ pub struct PollingStream<'a, B: Requester> {
|
||||||
/// Parent structure
|
/// Parent structure
|
||||||
polling: &'a mut Polling<B>,
|
polling: &'a mut Polling<B>,
|
||||||
|
|
||||||
/// Whatever to drop pending updates or not.
|
|
||||||
drop_pending_updates: bool,
|
|
||||||
|
|
||||||
/// Timeout parameter for normal `get_updates()` calls.
|
/// Timeout parameter for normal `get_updates()` calls.
|
||||||
timeout: Option<u32>,
|
timeout: Option<u32>,
|
||||||
/// Allowed updates parameter for the first `get_updates()` call.
|
/// Allowed updates parameter for the first `get_updates()` call.
|
||||||
|
@ -324,19 +322,19 @@ impl<B: Requester + Send + 'static> UpdateListener for Polling<B> {
|
||||||
Box::pin(async {
|
Box::pin(async {
|
||||||
let timeout = self.timeout.map(|t| t.as_secs().try_into().expect("timeout is too big"));
|
let timeout = self.timeout.map(|t| t.as_secs().try_into().expect("timeout is too big"));
|
||||||
let allowed_updates = self.allowed_updates.clone();
|
let allowed_updates = self.allowed_updates.clone();
|
||||||
let drop_pending_updates = self.drop_pending_updates;
|
|
||||||
|
|
||||||
// FIXME: document that `listen` is a destructive operation, actually,
|
// FIXME: document that `listen` is a destructive operation, actually,
|
||||||
// and you need to call `stop_token` *again* after it
|
// and you need to call `stop_token` *again* after it
|
||||||
self.reinit_stop_flag_if_needed();
|
self.reinit_stop_flag_if_needed();
|
||||||
|
|
||||||
// FIXME: do update dropping *here*
|
if self.drop_pending_updates {
|
||||||
|
self.bot.get_updates().offset(-1).limit(1).timeout(0).await?;
|
||||||
|
}
|
||||||
|
|
||||||
// Unwrap: just called reinit
|
// Unwrap: just called reinit
|
||||||
let flag = self.flag.take().unwrap();
|
let flag = self.flag.take().unwrap();
|
||||||
let stream = PollingStream {
|
let stream = PollingStream {
|
||||||
polling: self,
|
polling: self,
|
||||||
drop_pending_updates,
|
|
||||||
timeout,
|
timeout,
|
||||||
allowed_updates,
|
allowed_updates,
|
||||||
offset: 0,
|
offset: 0,
|
||||||
|
@ -409,29 +407,23 @@ impl<B: Requester> Stream for PollingStream<'_, B> {
|
||||||
*this.offset = upd.id.as_offset();
|
*this.offset = upd.id.as_offset();
|
||||||
}
|
}
|
||||||
|
|
||||||
match *this.drop_pending_updates {
|
*this.buffer = updates.into_iter();
|
||||||
false => *this.buffer = updates.into_iter(),
|
|
||||||
true => *this.drop_pending_updates = false,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Err(err) => return Ready(Some(Err(err))),
|
Err(err) => return Ready(Some(Err(err))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let (offset, limit, timeout) = match (this.stopping, this.drop_pending_updates) {
|
let (offset, limit, timeout) = match this.stopping {
|
||||||
// Normal `get_updates()` call
|
// Normal `get_updates()` call
|
||||||
(false, false) => (*this.offset, this.polling.limit, *this.timeout),
|
false => (*this.offset, this.polling.limit, *this.timeout),
|
||||||
// Graceful shutdown `get_updates()` call (shutdown takes priority over dropping pending
|
// Graceful shutdown `get_updates()` call
|
||||||
// updates)
|
|
||||||
//
|
//
|
||||||
// When stopping we set `timeout = 0` and `limit = 1` so that `get_updates()`
|
// When stopping we set `timeout = 0` and `limit = 1` so that `get_updates()`
|
||||||
// set last seen update (offset) and return immediately
|
// set last seen update (offset) and return immediately
|
||||||
(true, _) => {
|
true => {
|
||||||
log::trace!("graceful shutdown `get_updates` call");
|
log::trace!("graceful shutdown `get_updates` call");
|
||||||
(*this.offset, Some(1), Some(0))
|
(*this.offset, Some(1), Some(0))
|
||||||
}
|
}
|
||||||
// Drop pending updates
|
|
||||||
(_, true) => (-1, Some(1), Some(0)),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let req = this
|
let req = this
|
||||||
|
|
Loading…
Reference in a new issue