From a839b47106589464bc88f018ebd196c52cf87729 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 27 Jun 2022 00:24:54 +0400 Subject: [PATCH 01/14] Add polling builder Former-commit-id: 6729c965fa0a74585cefefdb1d09c1b0b446db4f --- src/dispatching/update_listeners.rs | 2 +- src/dispatching/update_listeners/polling.rs | 84 ++++++++++++++++++--- 2 files changed, 75 insertions(+), 11 deletions(-) diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index f3e358c5..206e90f9 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -43,7 +43,7 @@ mod polling; mod stateful_listener; pub use self::{ - polling::{polling, polling_default}, + polling::{polling, polling_builder, polling_default, PollingBuilder}, stateful_listener::StatefulListener, }; diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index 3987779d..e229be98 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -15,20 +15,84 @@ use crate::{ types::{AllowedUpdate, Update}, }; -/// Returns a long polling update listener with `timeout` of 10 seconds. -/// -/// See also: [`polling`](polling). -/// -/// ## Notes -/// -/// This function will automatically delete a webhook if it was set up. -pub async fn polling_default(requester: R) -> impl UpdateListener +/// Builder for polling update listener. +pub struct PollingBuilder { + bot: R, + timeout: Option, + limit: Option, + allowed_updates: Option>, +} + +impl PollingBuilder where R: Requester + Send + 'static, ::GetUpdates: Send, { - delete_webhook_if_setup(&requester).await; - polling(requester, Some(Duration::from_secs(10)), None, None) + /// Set timeout. + pub fn timeout(self, timeout: Duration) -> Self { + Self { timeout: Some(timeout), ..self } + } + + /// Set limit. + /// + /// ## Panics + /// + /// If `limit` is greater than 100. + #[track_caller] + pub fn limit(self, limit: u8) -> Self { + assert!(limit <= 100, "Maximum limit is 100"); + + Self { limit: Some(limit), ..self } + } + + /// Set allowed updates. + /// + /// ## Note + /// + /// Teloxide normally (when using [`Dispatcher`] or repls) sets this + /// automatically. + /// + /// [`Dispatcher`]: crate::dispatching::Dispatcher + pub fn allowed_updates(self, allowed_updates: Vec) -> Self { + Self { allowed_updates: Some(allowed_updates), ..self } + } + + /// Deletes webhook if it was set up. + pub async fn delete_webhook(self) -> Self { + delete_webhook_if_setup(&self.bot).await; + + self + } + + /// Creates a polling update listener. + pub fn build(self) -> impl UpdateListener { + let Self { bot, timeout, limit, allowed_updates } = self; + polling(bot, timeout, limit, allowed_updates) + } +} + +/// Returns a builder for polling update listener. +pub fn polling_builder(bot: R) -> PollingBuilder +where + R: Requester + Send + 'static, + ::GetUpdates: Send, +{ + PollingBuilder { bot, timeout: None, limit: None, allowed_updates: None } +} + +/// Returns a long polling update listener with `timeout` of 10 seconds. +/// +/// See also: [`polling_builder`]. +/// +/// ## Notes +/// +/// This function will automatically delete a webhook if it was set up. +pub async fn polling_default(bot: R) -> impl UpdateListener +where + R: Requester + Send + 'static, + ::GetUpdates: Send, +{ + polling_builder(bot).timeout(Duration::from_secs(10)).delete_webhook().await.build() } #[cfg_attr(doc, aquamarine::aquamarine)] From 2ceccdf442d4bb38ba0a1027534d4e757ac872dd Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 27 Jun 2022 02:03:34 +0400 Subject: [PATCH 02/14] Implement polling stream by hand IMO it's actually clearer & nicer than the old impl. +The types are now nameable. Former-commit-id: 82fc756aaba5f5dde03fd186b1fcddd3f938b09e --- src/dispatching/update_listeners/polling.rs | 244 ++++++++++++-------- 1 file changed, 151 insertions(+), 93 deletions(-) diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index e229be98..50f570ee 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -1,16 +1,22 @@ -use std::{convert::TryInto, time::Duration}; - -use futures::{ - future::{ready, Either}, - stream::{self, Stream, StreamExt}, +use std::{ + convert::TryInto, + future::Future, + pin::Pin, + task::{ + self, + Poll::{self, Ready}, + }, + time::Duration, + vec, }; +use futures::{ready, stream::Stream}; + use crate::{ dispatching::{ stop_token::{AsyncStopFlag, AsyncStopToken}, - update_listeners::{stateful_listener::StatefulListener, UpdateListener}, + update_listeners::{AsUpdateStream, UpdateListener}, }, - payloads::{GetUpdates, GetUpdatesSetters as _}, requests::{HasPayload, Request, Requester}, types::{AllowedUpdate, Update}, }; @@ -197,93 +203,8 @@ where R: Requester + Send + 'static, ::GetUpdates: Send, { - struct State { - bot: B, - timeout: Option, - limit: Option, - allowed_updates: Option>, - offset: i32, - flag: AsyncStopFlag, - token: AsyncStopToken, - force_stop: bool, - } - - fn stream(st: &mut State) -> impl Stream> + Send + '_ - where - B: Requester + Send, - ::GetUpdates: Send, - { - stream::unfold(st, move |state| async move { - let State { timeout, limit, allowed_updates, bot, offset, flag, force_stop, .. } = - &mut *state; - - if *force_stop { - return None; - } - - if flag.is_stopped() { - let mut req = bot.get_updates().offset(*offset).timeout(0).limit(1); - req.payload_mut().allowed_updates = allowed_updates.take(); - - return match req.send().await { - Ok(_) => None, - Err(err) => { - // Prevents infinite retries, see https://github.com/teloxide/teloxide/issues/496 - *force_stop = true; - - Some((Either::Left(stream::once(ready(Err(err)))), state)) - } - }; - } - - let mut req = bot.get_updates(); - *req.payload_mut() = GetUpdates { - offset: Some(*offset), - timeout: *timeout, - limit: *limit, - allowed_updates: allowed_updates.take(), - }; - - match req.send().await { - Ok(updates) => { - // Set offset to the last update's id + 1 - if let Some(upd) = updates.last() { - *offset = upd.id + 1; - } - - let updates = updates.into_iter().map(Ok); - Some((Either::Right(stream::iter(updates)), state)) - } - Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)), - } - }) - .flatten() - } - let (token, flag) = AsyncStopToken::new_pair(); - - let state = State { - bot, - timeout: timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")), - limit, - allowed_updates, - offset: 0, - flag, - token, - force_stop: false, - }; - - let stop_token = |st: &mut State<_>| st.token.clone(); - - let hint_allowed_updates = - Some(|state: &mut State<_>, allowed: &mut dyn Iterator| { - // TODO: we should probably warn if there already were different allowed updates - // before - state.allowed_updates = Some(allowed.collect()); - }); - let timeout_hint = Some(move |_: &State<_>| timeout); - - StatefulListener::new_with_hints(state, stream, stop_token, hint_allowed_updates, timeout_hint) + Polling { bot, timeout, limit, allowed_updates, flag, token } } async fn delete_webhook_if_setup(requester: &R) @@ -307,6 +228,143 @@ where } } +struct Polling { + bot: B, + timeout: Option, + limit: Option, + allowed_updates: Option>, + flag: AsyncStopFlag, + token: AsyncStopToken, +} + +#[pin_project::pin_project] +struct PollingStream<'a, B: Requester> { + /// Parent structure + polling: &'a mut Polling, + + /// Timeout parameter for normal `get_updates()` calls. + timeout: Option, + /// Allowed updates parameter for the first `get_updates()` call. + allowed_updates: Option>, + /// Offset parameter for normal `get_updates()` calls. + offset: i32, + + /// If this is set, return `None` from `poll_next` immediately. + force_stop: bool, + /// If true we've sent last `get_updates()` call for graceful shutdown. + stopping: bool, + + /// Buffer of updates to be yielded. + buffer: vec::IntoIter, + + /// In-flight `get_updates()` call. + #[pin] + in_flight: Option<::Send>, +} + +impl UpdateListener for Polling { + type StopToken = AsyncStopToken; + + fn stop_token(&mut self) -> Self::StopToken { + self.token.clone() + } + + fn hint_allowed_updates(&mut self, hint: &mut dyn Iterator) { + // TODO: we should probably warn if there already were different allowed updates + // before + self.allowed_updates = Some(hint.collect()); + } + + fn timeout_hint(&self) -> Option { + self.timeout + } +} + +impl<'a, B: Requester + Send + 'a> AsUpdateStream<'a, B::Err> for Polling { + type Stream = PollingStream<'a, B>; + + fn as_stream(&'a mut self) -> Self::Stream { + let timeout = self.timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")); + let allowed_updates = self.allowed_updates.clone(); + PollingStream { + polling: self, + timeout, + allowed_updates, + offset: 0, + force_stop: false, + stopping: false, + buffer: Vec::new().into_iter(), + in_flight: None, + } + } +} + +impl Stream for PollingStream<'_, B> { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + + if *this.force_stop { + return Ready(None); + } + + // Poll in-flight future until completion + if let Some(in_flight) = this.in_flight.as_mut().as_pin_mut() { + let res = ready!(in_flight.poll(cx)); + this.in_flight.set(None); + + match res { + Ok(_) if *this.stopping => return Ready(None), + Err(err) if *this.stopping => { + // Prevents infinite retries, see https://github.com/teloxide/teloxide/issues/496 + *this.force_stop = true; + + return Ready(Some(Err(err))); + } + Ok(updates) => { + if let Some(upd) = updates.last() { + *this.offset = upd.id + 1; + } + + *this.buffer = updates.into_iter(); + } + Err(err) => return Ready(Some(Err(err))), + } + } + + // If there are any buffered updates, return one + if let Some(upd) = this.buffer.next() { + return Ready(Some(Ok(upd))); + } + + // When stopping we set `timeout = 0` and `limit = 1` so that `get_updates()` + // set last seen update (offset) and return immediately + let (timeout, limit) = if this.polling.flag.is_stopped() { + *this.stopping = true; + (Some(0), Some(1)) + } else { + (*this.timeout, this.polling.limit) + }; + + let req = this + .polling + .bot + .get_updates() + .with_payload_mut(|pay| { + pay.offset = Some(*this.offset); + pay.timeout = timeout; + pay.limit = limit; + pay.allowed_updates = this.allowed_updates.take(); + }) + .send(); + this.in_flight.set(Some(req)); + + // Recurse to poll `self.in_flight` + self.poll_next(cx) + } +} + #[test] fn polling_is_send() { use crate::dispatching::update_listeners::AsUpdateStream; From e51c4c774c08f2aadebd73b7fdc13986f2c8771d Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 27 Jun 2022 02:06:29 +0400 Subject: [PATCH 03/14] Return named `Polling<_>` type from polling* functions This replaces the `impl UpdateListener` and makes using polling nicer. Former-commit-id: db417caa528119ba1deff929226960a4f909b94d --- src/dispatching/update_listeners/polling.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index 50f570ee..d827e0cf 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -71,7 +71,7 @@ where } /// Creates a polling update listener. - pub fn build(self) -> impl UpdateListener { + pub fn build(self) -> Polling { let Self { bot, timeout, limit, allowed_updates } = self; polling(bot, timeout, limit, allowed_updates) } @@ -93,7 +93,7 @@ where /// ## Notes /// /// This function will automatically delete a webhook if it was set up. -pub async fn polling_default(bot: R) -> impl UpdateListener +pub async fn polling_default(bot: R) -> Polling where R: Requester + Send + 'static, ::GetUpdates: Send, @@ -198,7 +198,7 @@ pub fn polling( timeout: Option, limit: Option, allowed_updates: Option>, -) -> impl UpdateListener +) -> Polling where R: Requester + Send + 'static, ::GetUpdates: Send, @@ -228,7 +228,7 @@ where } } -struct Polling { +pub struct Polling { bot: B, timeout: Option, limit: Option, @@ -238,7 +238,7 @@ struct Polling { } #[pin_project::pin_project] -struct PollingStream<'a, B: Requester> { +pub struct PollingStream<'a, B: Requester> { /// Parent structure polling: &'a mut Polling, From 060886737fccc02794b66f828700ebabd223c419 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 27 Jun 2022 02:28:59 +0400 Subject: [PATCH 04/14] Deprecate `polling()`, expose `Polling{,Builder}` and fix docs Former-commit-id: e0e5da30baad1f93e263894f8048f7b1d7b8040e --- src/dispatching/update_listeners.rs | 3 +- src/dispatching/update_listeners/polling.rs | 164 ++++++++++---------- 2 files changed, 84 insertions(+), 83 deletions(-) diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index 206e90f9..677f681e 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -42,8 +42,9 @@ use crate::{ mod polling; mod stateful_listener; +#[allow(deprecated)] pub use self::{ - polling::{polling, polling_builder, polling_default, PollingBuilder}, + polling::{polling, polling_builder, polling_default, Polling, PollingBuilder, PollingStream}, stateful_listener::StatefulListener, }; diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index d827e0cf..00a5c013 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -34,12 +34,20 @@ where R: Requester + Send + 'static, ::GetUpdates: Send, { - /// Set timeout. + /// A timeout in seconds for polling. + /// + /// ## Note + /// + /// `timeout` should not be bigger than http client timeout, see + /// [`default_reqwest_settings`] for default http client settings. + /// + /// [`default_reqwest_settings`]: crate::net::default_reqwest_settings pub fn timeout(self, timeout: Duration) -> Self { Self { timeout: Some(timeout), ..self } } - /// Set limit. + /// Limit the number of updates to be retrieved at once. Values between + /// 1—100 are accepted. /// /// ## Panics /// @@ -51,14 +59,17 @@ where Self { limit: Some(limit), ..self } } - /// Set allowed updates. + /// A list of the types of updates you want to receive. /// /// ## Note /// - /// Teloxide normally (when using [`Dispatcher`] or repls) sets this - /// automatically. + /// Teloxide normally (when using [`Dispatcher`] or [`repl`]s) sets this + /// automatically via [`hint_allowed_updates`], so you rarely need to use + /// `allowed_updates` explicitly. /// /// [`Dispatcher`]: crate::dispatching::Dispatcher + /// [`repl`]: fn@crate::repl + /// [`hint_allowed_updates`]: crate::dispatching::update_listeners::UpdateListener::hint_allowed_updates pub fn allowed_updates(self, allowed_updates: Vec) -> Self { Self { allowed_updates: Some(allowed_updates), ..self } } @@ -70,10 +81,14 @@ where self } - /// Creates a polling update listener. + /// Returns a long polling update listener with configuration from the + /// builder. + /// + /// See also: [`polling_default`], [`Polling`]. pub fn build(self) -> Polling { let Self { bot, timeout, limit, allowed_updates } = self; - polling(bot, timeout, limit, allowed_updates) + let (token, flag) = AsyncStopToken::new_pair(); + Polling { bot, timeout, limit, allowed_updates, flag, token } } } @@ -101,59 +116,77 @@ where polling_builder(bot).timeout(Duration::from_secs(10)).delete_webhook().await.build() } -#[cfg_attr(doc, aquamarine::aquamarine)] /// Returns a long polling update listener with some additional options. -/// -/// - `bot`: Using this bot, the returned update listener will receive updates. -/// - `timeout`: A timeout in seconds for polling. -/// - `limit`: Limits the number of updates to be retrieved at once. Values -/// between 1—100 are accepted. -/// - `allowed_updates`: A list the types of updates you want to receive. -/// -/// See [`GetUpdates`] for defaults. -/// -/// See also: [`polling_default`](polling_default). -/// -/// ## Notes -/// -/// - `timeout` should not be bigger than http client timeout, see -/// [`default_reqwest_settings`] for default http client settings. -/// - [`repl`]s and [`Dispatcher`] use [`hint_allowed_updates`] to set -/// `allowed_updates`, so you rarely need to pass `allowed_updates` -/// explicitly. -/// -/// [`default_reqwest_settings`]: teloxide::net::default_reqwest_settings -/// [`repl`]: fn@crate::repl -/// [`Dispatcher`]: crate::dispatching::Dispatcher -/// [`hint_allowed_updates`]: -/// crate::dispatching::update_listeners::UpdateListener::hint_allowed_updates +#[deprecated(since = "0.7.0", note = "use `polling_builder` instead")] +pub fn polling( + bot: R, + timeout: Option, + limit: Option, + allowed_updates: Option>, +) -> Polling +where + R: Requester + Send + 'static, + ::GetUpdates: Send, +{ + let mut builder = polling_builder(bot); + builder.timeout = timeout; + builder.limit = limit; + builder.allowed_updates = allowed_updates; + builder.build() +} + +async fn delete_webhook_if_setup(requester: &R) +where + R: Requester, +{ + let webhook_info = match requester.get_webhook_info().send().await { + Ok(ok) => ok, + Err(e) => { + log::error!("Failed to get webhook info: {:?}", e); + return; + } + }; + + let is_webhook_setup = webhook_info.url.is_some(); + + if is_webhook_setup { + if let Err(e) = requester.delete_webhook().send().await { + log::error!("Failed to delete a webhook: {:?}", e); + } + } +} + +#[cfg_attr(doc, aquamarine::aquamarine)] +/// A polling update listener. /// /// ## How it works /// -/// Long polling works by repeatedly calling [`Bot::get_updates`][get_updates]. -/// If telegram has any updates, it returns them immediately, otherwise it waits -/// until either it has any updates or `timeout` expires. +/// Long polling works by repeatedly calling +/// [`Bot::get_updates`][get_updates]. If telegram has any updates, it +/// returns them immediately, otherwise it waits until either it has any +/// updates or `timeout` expires. /// -/// Each [`get_updates`][get_updates] call includes an `offset` parameter equal -/// to the latest update id + one, that allows to only receive updates that has -/// not been received before. +/// Each [`get_updates`][get_updates] call includes an `offset` parameter +/// equal to the latest update id + one, that allows to only receive +/// updates that has not been received before. /// -/// When telegram receives a [`get_updates`][get_updates] request with `offset = -/// N` it forgets any updates with id < `N`. When `polling` listener is stopped, -/// it sends [`get_updates`][get_updates] with `timeout = 0, limit = 1` and -/// appropriate `offset`, so future bot restarts won't see updates that were -/// already seen. +/// When telegram receives a [`get_updates`][get_updates] request with +/// `offset = N` it forgets any updates with id < `N`. When `polling` +/// listener is stopped, it sends [`get_updates`][get_updates] with +/// `timeout = 0, limit = 1` and appropriate `offset`, so future bot +/// restarts won't see updates that were already seen. /// -/// Consumers of a `polling` update listener then need to repeatedly call +/// Consumers of a [`Polling`] update listener then need to repeatedly call /// [`futures::StreamExt::next`] to get the updates. /// -/// Here is an example diagram that shows these interactions between consumers -/// like [`Dispatcher`], `polling` update listener and telegram. +/// Here is an example diagram that shows these interactions between +/// consumers like [`Dispatcher`], [`Polling`] update listener and +/// telegram. /// /// ```mermaid /// sequenceDiagram /// participant C as Consumer -/// participant P as polling +/// participant P as Polling /// participant T as Telegram /// /// link C: Dispatcher @ ../struct.Dispatcher.html @@ -193,41 +226,7 @@ where /// ``` /// /// [get_updates]: crate::requests::Requester::get_updates -pub fn polling( - bot: R, - timeout: Option, - limit: Option, - allowed_updates: Option>, -) -> Polling -where - R: Requester + Send + 'static, - ::GetUpdates: Send, -{ - let (token, flag) = AsyncStopToken::new_pair(); - Polling { bot, timeout, limit, allowed_updates, flag, token } -} - -async fn delete_webhook_if_setup(requester: &R) -where - R: Requester, -{ - let webhook_info = match requester.get_webhook_info().send().await { - Ok(ok) => ok, - Err(e) => { - log::error!("Failed to get webhook info: {:?}", e); - return; - } - }; - - let is_webhook_setup = webhook_info.url.is_some(); - - if is_webhook_setup { - if let Err(e) = requester.delete_webhook().send().await { - log::error!("Failed to delete a webhook: {:?}", e); - } - } -} - +/// [`Dispatcher`]: crate::dispatching::Dispatcher pub struct Polling { bot: B, timeout: Option, @@ -370,6 +369,7 @@ fn polling_is_send() { use crate::dispatching::update_listeners::AsUpdateStream; let bot = crate::Bot::new("TOKEN"); + #[allow(deprecated)] let mut polling = polling(bot, None, None, None); assert_send(&polling); From 60fc833108a41b07016ac12d00f3b2270e115a29 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 27 Jun 2022 02:36:37 +0400 Subject: [PATCH 05/14] Remove useless use Former-commit-id: 58bbf8e7376625ea70837905411eb6b93270a894 --- src/dispatching/update_listeners/polling.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index 00a5c013..1fc6273f 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -366,8 +366,6 @@ impl Stream for PollingStream<'_, B> { #[test] fn polling_is_send() { - use crate::dispatching::update_listeners::AsUpdateStream; - let bot = crate::Bot::new("TOKEN"); #[allow(deprecated)] let mut polling = polling(bot, None, None, None); From f9da86f88138738c0708cba0b4dd4c0163e294d3 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 27 Jun 2022 02:58:48 +0400 Subject: [PATCH 06/14] Add option to drop pending updates w/ polling Former-commit-id: 612f47d242f89ffb256ed387ca2253231f8693fc --- src/dispatching/update_listeners/polling.rs | 50 ++++++++++++++++----- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index 1fc6273f..e3ad7ab5 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -27,6 +27,7 @@ pub struct PollingBuilder { timeout: Option, limit: Option, allowed_updates: Option>, + drop_pending_updates: bool, } impl PollingBuilder @@ -74,6 +75,11 @@ where Self { allowed_updates: Some(allowed_updates), ..self } } + /// Drops pending updates. + pub fn drop_pending_updates(self) -> Self { + Self { drop_pending_updates: true, ..self } + } + /// Deletes webhook if it was set up. pub async fn delete_webhook(self) -> Self { delete_webhook_if_setup(&self.bot).await; @@ -86,9 +92,9 @@ where /// /// See also: [`polling_default`], [`Polling`]. pub fn build(self) -> Polling { - let Self { bot, timeout, limit, allowed_updates } = self; + let Self { bot, timeout, limit, allowed_updates, drop_pending_updates } = self; let (token, flag) = AsyncStopToken::new_pair(); - Polling { bot, timeout, limit, allowed_updates, flag, token } + Polling { bot, timeout, limit, allowed_updates, drop_pending_updates, flag, token } } } @@ -98,7 +104,13 @@ where R: Requester + Send + 'static, ::GetUpdates: Send, { - PollingBuilder { bot, timeout: None, limit: None, allowed_updates: None } + PollingBuilder { + bot, + timeout: None, + limit: None, + allowed_updates: None, + drop_pending_updates: false, + } } /// Returns a long polling update listener with `timeout` of 10 seconds. @@ -232,6 +244,7 @@ pub struct Polling { timeout: Option, limit: Option, allowed_updates: Option>, + drop_pending_updates: bool, flag: AsyncStopFlag, token: AsyncStopToken, } @@ -241,6 +254,9 @@ pub struct PollingStream<'a, B: Requester> { /// Parent structure polling: &'a mut Polling, + /// Whatever to drop pending updates or not. + drop_pending_updates: bool, + /// Timeout parameter for normal `get_updates()` calls. timeout: Option, /// Allowed updates parameter for the first `get_updates()` call. @@ -285,8 +301,10 @@ impl<'a, B: Requester + Send + 'a> AsUpdateStream<'a, B::Err> for Polling { fn as_stream(&'a mut self) -> Self::Stream { let timeout = self.timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")); let allowed_updates = self.allowed_updates.clone(); + let drop_pending_updates = self.drop_pending_updates; PollingStream { polling: self, + drop_pending_updates, timeout, allowed_updates, offset: 0, @@ -326,7 +344,10 @@ impl Stream for PollingStream<'_, B> { *this.offset = upd.id + 1; } - *this.buffer = updates.into_iter(); + match *this.drop_pending_updates { + false => *this.buffer = updates.into_iter(), + true => *this.drop_pending_updates = false, + } } Err(err) => return Ready(Some(Err(err))), } @@ -337,13 +358,18 @@ impl Stream for PollingStream<'_, B> { return Ready(Some(Ok(upd))); } - // When stopping we set `timeout = 0` and `limit = 1` so that `get_updates()` - // set last seen update (offset) and return immediately - let (timeout, limit) = if this.polling.flag.is_stopped() { - *this.stopping = true; - (Some(0), Some(1)) - } else { - (*this.timeout, this.polling.limit) + *this.stopping = this.polling.flag.is_stopped(); + let (offset, limit, timeout) = match (this.stopping, this.drop_pending_updates) { + // Normal `get_updates()` call + (false, false) => (*this.offset, this.polling.limit, *this.timeout), + // Graceful shutdown `get_updates()` call (shutdown takes priority over dropping pending + // updates) + // + // When stopping we set `timeout = 0` and `limit = 1` so that `get_updates()` + // set last seen update (offset) and return immediately + (true, _) => (*this.offset, Some(1), Some(0)), + // Drop pending updates + (_, true) => (-1, Some(1), Some(0)), }; let req = this @@ -351,7 +377,7 @@ impl Stream for PollingStream<'_, B> { .bot .get_updates() .with_payload_mut(|pay| { - pay.offset = Some(*this.offset); + pay.offset = Some(offset); pay.timeout = timeout; pay.limit = limit; pay.allowed_updates = this.allowed_updates.take(); From 09d189f689da882d273bcfe5fdf5d8279334917b Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 27 Jun 2022 03:09:19 +0400 Subject: [PATCH 07/14] Panic for limit=0 Former-commit-id: b29d60657fcbca67e5e74520aba5671084d9898b --- src/dispatching/update_listeners/polling.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index e3ad7ab5..74c99bba 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -52,10 +52,11 @@ where /// /// ## Panics /// - /// If `limit` is greater than 100. + /// If `limit` is 0 or greater than 100. #[track_caller] pub fn limit(self, limit: u8) -> Self { - assert!(limit <= 100, "Maximum limit is 100"); + assert_ne!(limit, 0, "limit can't be 0"); + assert!(limit <= 100, "maximum limit is 100, can't set limit to `{limit}`"); Self { limit: Some(limit), ..self } } From a07af45ec7a417f35c6ee7592bf0766c5d395be4 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 27 Jun 2022 03:15:52 +0400 Subject: [PATCH 08/14] Update changelog Former-commit-id: 4f2e723d56bc3d08b4ad6e280b2893f9a0d6c186 --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b0647a8..2baea730 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Security checks based on `secret_token` param of `set_webhook` to built-in webhooks +- `dispatching::update_listeners::{polling_builder, PollingBuilder, Polling, PollingStream}` ### Fixed @@ -17,6 +18,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Add the `Key: Clone` requirement for `impl Dispatcher` [**BC**]. + - `dispatching::update_listeners::{polling_default, polling}` now return a named, `Polling<_>` type + +### Deprecated + +- `dispatching::update_listeners::polling` ## 0.9.2 - 2022-06-07 From 79e393c44526f3417b1028a02de8302b972ac56b Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 27 Jun 2022 03:23:04 +0400 Subject: [PATCH 09/14] Correct deprecated since version Former-commit-id: 195d34ba0c56fd78422780994cfcacddca33dda8 --- src/dispatching/update_listeners/polling.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index 74c99bba..f71b8d1c 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -130,7 +130,7 @@ where } /// Returns a long polling update listener with some additional options. -#[deprecated(since = "0.7.0", note = "use `polling_builder` instead")] +#[deprecated(since = "0.10.0", note = "use `polling_builder` instead")] pub fn polling( bot: R, timeout: Option, From cce0710c8ccc0846df35def421a7aca0f4fe5161 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 27 Jun 2022 03:28:23 +0400 Subject: [PATCH 10/14] Move `polling_builder` => `Polling::builder` Former-commit-id: 79f6cf4ee9ff0a13613b80cd3f690382916d91ca --- CHANGELOG.md | 2 +- src/dispatching/update_listeners.rs | 2 +- src/dispatching/update_listeners/polling.rs | 38 +++++++++++---------- 3 files changed, 22 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2baea730..67ef18dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Security checks based on `secret_token` param of `set_webhook` to built-in webhooks -- `dispatching::update_listeners::{polling_builder, PollingBuilder, Polling, PollingStream}` +- `dispatching::update_listeners::{PollingBuilder, Polling, PollingStream}` ### Fixed diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index 677f681e..03a9fc5c 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -44,7 +44,7 @@ mod stateful_listener; #[allow(deprecated)] pub use self::{ - polling::{polling, polling_builder, polling_default, Polling, PollingBuilder, PollingStream}, + polling::{polling, polling_default, Polling, PollingBuilder, PollingStream}, stateful_listener::StatefulListener, }; diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index f71b8d1c..c41eeabe 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -99,21 +99,6 @@ where } } -/// Returns a builder for polling update listener. -pub fn polling_builder(bot: R) -> PollingBuilder -where - R: Requester + Send + 'static, - ::GetUpdates: Send, -{ - PollingBuilder { - bot, - timeout: None, - limit: None, - allowed_updates: None, - drop_pending_updates: false, - } -} - /// Returns a long polling update listener with `timeout` of 10 seconds. /// /// See also: [`polling_builder`]. @@ -126,11 +111,11 @@ where R: Requester + Send + 'static, ::GetUpdates: Send, { - polling_builder(bot).timeout(Duration::from_secs(10)).delete_webhook().await.build() + Polling::builder(bot).timeout(Duration::from_secs(10)).delete_webhook().await.build() } /// Returns a long polling update listener with some additional options. -#[deprecated(since = "0.10.0", note = "use `polling_builder` instead")] +#[deprecated(since = "0.10.0", note = "use `Polling::builder()` instead")] pub fn polling( bot: R, timeout: Option, @@ -141,7 +126,7 @@ where R: Requester + Send + 'static, ::GetUpdates: Send, { - let mut builder = polling_builder(bot); + let mut builder = Polling::builder(bot); builder.timeout = timeout; builder.limit = limit; builder.allowed_updates = allowed_updates; @@ -250,6 +235,23 @@ pub struct Polling { token: AsyncStopToken, } +impl Polling +where + R: Requester + Send + 'static, + ::GetUpdates: Send, +{ + /// Returns a builder for polling update listener. + pub fn builder(bot: R) -> PollingBuilder { + PollingBuilder { + bot, + timeout: None, + limit: None, + allowed_updates: None, + drop_pending_updates: false, + } + } +} + #[pin_project::pin_project] pub struct PollingStream<'a, B: Requester> { /// Parent structure From 4e5e7a145d133516577964c771b2b3aaf76f0cb3 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 27 Jun 2022 03:32:57 +0400 Subject: [PATCH 11/14] Fix docs Former-commit-id: ee52bb28b4d8c1781743f85bcfe044f1f135ef9d --- src/dispatching/update_listeners/polling.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index c41eeabe..3eeba061 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -22,6 +22,8 @@ use crate::{ }; /// Builder for polling update listener. +/// +/// Can be created by [`Polling::builder`]. pub struct PollingBuilder { bot: R, timeout: Option, @@ -101,7 +103,7 @@ where /// Returns a long polling update listener with `timeout` of 10 seconds. /// -/// See also: [`polling_builder`]. +/// See also: [`Polling::builder`]. /// /// ## Notes /// From f0608da9c3648c6d8a2783e90ab449e798e7c517 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 4 Jul 2022 23:39:04 +0400 Subject: [PATCH 12/14] When creating `Polling` assert that it's an `UpdateListener` Former-commit-id: 08da55f54f827af1944053a1897918f0d6075ac8 --- src/dispatching/update_listeners.rs | 8 ++++++++ src/dispatching/update_listeners/polling.rs | 14 ++++++++++---- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index 03a9fc5c..9c4abc2f 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -126,3 +126,11 @@ pub trait AsUpdateStream<'a, E> { /// [`Stream`]: AsUpdateStream::Stream fn as_stream(&'a mut self) -> Self::Stream; } + +#[inline(always)] +pub(crate) fn assert_update_listener(listener: L) -> L +where + L: UpdateListener, +{ + listener +} diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index 3eeba061..9a2ee7d6 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -15,7 +15,7 @@ use futures::{ready, stream::Stream}; use crate::{ dispatching::{ stop_token::{AsyncStopFlag, AsyncStopToken}, - update_listeners::{AsUpdateStream, UpdateListener}, + update_listeners::{assert_update_listener, AsUpdateStream, UpdateListener}, }, requests::{HasPayload, Request, Requester}, types::{AllowedUpdate, Update}, @@ -97,7 +97,10 @@ where pub fn build(self) -> Polling { let Self { bot, timeout, limit, allowed_updates, drop_pending_updates } = self; let (token, flag) = AsyncStopToken::new_pair(); - Polling { bot, timeout, limit, allowed_updates, drop_pending_updates, flag, token } + let polling = + Polling { bot, timeout, limit, allowed_updates, drop_pending_updates, flag, token }; + + assert_update_listener(polling) } } @@ -113,7 +116,10 @@ where R: Requester + Send + 'static, ::GetUpdates: Send, { - Polling::builder(bot).timeout(Duration::from_secs(10)).delete_webhook().await.build() + let polling = + Polling::builder(bot).timeout(Duration::from_secs(10)).delete_webhook().await.build(); + + assert_update_listener(polling) } /// Returns a long polling update listener with some additional options. @@ -132,7 +138,7 @@ where builder.timeout = timeout; builder.limit = limit; builder.allowed_updates = allowed_updates; - builder.build() + assert_update_listener(builder.build()) } async fn delete_webhook_if_setup(requester: &R) From 1ff0440762431f190468e088ffd3eecd28f23496 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 4 Jul 2022 23:45:49 +0400 Subject: [PATCH 13/14] Mark `Polling` and `PollingBuilder` as `must_use` Former-commit-id: 430df0cefbdbf442b930701125ba7cf01cfdecc0 --- src/dispatching/update_listeners/polling.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index 9a2ee7d6..906ec98c 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -24,6 +24,7 @@ use crate::{ /// Builder for polling update listener. /// /// Can be created by [`Polling::builder`]. +#[must_use = "`PollingBuilder` is a builder and does nothing unless used"] pub struct PollingBuilder { bot: R, timeout: Option, @@ -233,6 +234,7 @@ where /// /// [get_updates]: crate::requests::Requester::get_updates /// [`Dispatcher`]: crate::dispatching::Dispatcher +#[must_use = "`Polling` is an update listener and does nothing unless used"] pub struct Polling { bot: B, timeout: Option, From 8cecf248b240873c438ea4895e3dcd26c3398440 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Tue, 5 Jul 2022 00:07:15 +0400 Subject: [PATCH 14/14] Make fields of `PollingBuilder` public Former-commit-id: cd63dbf1e7d76bd98bfa239e09c357c76d97cb4e --- src/dispatching/update_listeners/polling.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index 906ec98c..118fc02e 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -24,13 +24,14 @@ use crate::{ /// Builder for polling update listener. /// /// Can be created by [`Polling::builder`]. +#[non_exhaustive] #[must_use = "`PollingBuilder` is a builder and does nothing unless used"] pub struct PollingBuilder { - bot: R, - timeout: Option, - limit: Option, - allowed_updates: Option>, - drop_pending_updates: bool, + pub bot: R, + pub timeout: Option, + pub limit: Option, + pub allowed_updates: Option>, + pub drop_pending_updates: bool, } impl PollingBuilder