diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b0647a8..67ef18dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Security checks based on `secret_token` param of `set_webhook` to built-in webhooks +- `dispatching::update_listeners::{PollingBuilder, Polling, PollingStream}` ### Fixed @@ -17,6 +18,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Add the `Key: Clone` requirement for `impl Dispatcher` [**BC**]. + - `dispatching::update_listeners::{polling_default, polling}` now return a named, `Polling<_>` type + +### Deprecated + +- `dispatching::update_listeners::polling` ## 0.9.2 - 2022-06-07 diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index f3e358c5..9c4abc2f 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -42,8 +42,9 @@ use crate::{ mod polling; mod stateful_listener; +#[allow(deprecated)] pub use self::{ - polling::{polling, polling_default}, + polling::{polling, polling_default, Polling, PollingBuilder, PollingStream}, stateful_listener::StatefulListener, }; @@ -125,3 +126,11 @@ pub trait AsUpdateStream<'a, E> { /// [`Stream`]: AsUpdateStream::Stream fn as_stream(&'a mut self) -> Self::Stream; } + +#[inline(always)] +pub(crate) fn assert_update_listener(listener: L) -> L +where + L: UpdateListener, +{ + listener +} diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index 3987779d..118fc02e 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -1,89 +1,200 @@ -use std::{convert::TryInto, time::Duration}; - -use futures::{ - future::{ready, Either}, - stream::{self, Stream, StreamExt}, +use std::{ + convert::TryInto, + future::Future, + pin::Pin, + task::{ + self, + Poll::{self, Ready}, + }, + time::Duration, + vec, }; +use futures::{ready, stream::Stream}; + use crate::{ dispatching::{ stop_token::{AsyncStopFlag, AsyncStopToken}, - update_listeners::{stateful_listener::StatefulListener, UpdateListener}, + update_listeners::{assert_update_listener, AsUpdateStream, UpdateListener}, }, - payloads::{GetUpdates, GetUpdatesSetters as _}, requests::{HasPayload, Request, Requester}, types::{AllowedUpdate, Update}, }; -/// Returns a long polling update listener with `timeout` of 10 seconds. +/// Builder for polling update listener. /// -/// See also: [`polling`](polling). -/// -/// ## Notes -/// -/// This function will automatically delete a webhook if it was set up. -pub async fn polling_default(requester: R) -> impl UpdateListener +/// Can be created by [`Polling::builder`]. +#[non_exhaustive] +#[must_use = "`PollingBuilder` is a builder and does nothing unless used"] +pub struct PollingBuilder { + pub bot: R, + pub timeout: Option, + pub limit: Option, + pub allowed_updates: Option>, + pub drop_pending_updates: bool, +} + +impl PollingBuilder where R: Requester + Send + 'static, ::GetUpdates: Send, { - delete_webhook_if_setup(&requester).await; - polling(requester, Some(Duration::from_secs(10)), None, None) + /// A timeout in seconds for polling. + /// + /// ## Note + /// + /// `timeout` should not be bigger than http client timeout, see + /// [`default_reqwest_settings`] for default http client settings. + /// + /// [`default_reqwest_settings`]: crate::net::default_reqwest_settings + pub fn timeout(self, timeout: Duration) -> Self { + Self { timeout: Some(timeout), ..self } + } + + /// Limit the number of updates to be retrieved at once. Values between + /// 1—100 are accepted. + /// + /// ## Panics + /// + /// If `limit` is 0 or greater than 100. + #[track_caller] + pub fn limit(self, limit: u8) -> Self { + assert_ne!(limit, 0, "limit can't be 0"); + assert!(limit <= 100, "maximum limit is 100, can't set limit to `{limit}`"); + + Self { limit: Some(limit), ..self } + } + + /// A list of the types of updates you want to receive. + /// + /// ## Note + /// + /// Teloxide normally (when using [`Dispatcher`] or [`repl`]s) sets this + /// automatically via [`hint_allowed_updates`], so you rarely need to use + /// `allowed_updates` explicitly. + /// + /// [`Dispatcher`]: crate::dispatching::Dispatcher + /// [`repl`]: fn@crate::repl + /// [`hint_allowed_updates`]: crate::dispatching::update_listeners::UpdateListener::hint_allowed_updates + pub fn allowed_updates(self, allowed_updates: Vec) -> Self { + Self { allowed_updates: Some(allowed_updates), ..self } + } + + /// Drops pending updates. + pub fn drop_pending_updates(self) -> Self { + Self { drop_pending_updates: true, ..self } + } + + /// Deletes webhook if it was set up. + pub async fn delete_webhook(self) -> Self { + delete_webhook_if_setup(&self.bot).await; + + self + } + + /// Returns a long polling update listener with configuration from the + /// builder. + /// + /// See also: [`polling_default`], [`Polling`]. + pub fn build(self) -> Polling { + let Self { bot, timeout, limit, allowed_updates, drop_pending_updates } = self; + let (token, flag) = AsyncStopToken::new_pair(); + let polling = + Polling { bot, timeout, limit, allowed_updates, drop_pending_updates, flag, token }; + + assert_update_listener(polling) + } } -#[cfg_attr(doc, aquamarine::aquamarine)] -/// Returns a long polling update listener with some additional options. +/// Returns a long polling update listener with `timeout` of 10 seconds. /// -/// - `bot`: Using this bot, the returned update listener will receive updates. -/// - `timeout`: A timeout in seconds for polling. -/// - `limit`: Limits the number of updates to be retrieved at once. Values -/// between 1—100 are accepted. -/// - `allowed_updates`: A list the types of updates you want to receive. -/// -/// See [`GetUpdates`] for defaults. -/// -/// See also: [`polling_default`](polling_default). +/// See also: [`Polling::builder`]. /// /// ## Notes /// -/// - `timeout` should not be bigger than http client timeout, see -/// [`default_reqwest_settings`] for default http client settings. -/// - [`repl`]s and [`Dispatcher`] use [`hint_allowed_updates`] to set -/// `allowed_updates`, so you rarely need to pass `allowed_updates` -/// explicitly. -/// -/// [`default_reqwest_settings`]: teloxide::net::default_reqwest_settings -/// [`repl`]: fn@crate::repl -/// [`Dispatcher`]: crate::dispatching::Dispatcher -/// [`hint_allowed_updates`]: -/// crate::dispatching::update_listeners::UpdateListener::hint_allowed_updates +/// This function will automatically delete a webhook if it was set up. +pub async fn polling_default(bot: R) -> Polling +where + R: Requester + Send + 'static, + ::GetUpdates: Send, +{ + let polling = + Polling::builder(bot).timeout(Duration::from_secs(10)).delete_webhook().await.build(); + + assert_update_listener(polling) +} + +/// Returns a long polling update listener with some additional options. +#[deprecated(since = "0.10.0", note = "use `Polling::builder()` instead")] +pub fn polling( + bot: R, + timeout: Option, + limit: Option, + allowed_updates: Option>, +) -> Polling +where + R: Requester + Send + 'static, + ::GetUpdates: Send, +{ + let mut builder = Polling::builder(bot); + builder.timeout = timeout; + builder.limit = limit; + builder.allowed_updates = allowed_updates; + assert_update_listener(builder.build()) +} + +async fn delete_webhook_if_setup(requester: &R) +where + R: Requester, +{ + let webhook_info = match requester.get_webhook_info().send().await { + Ok(ok) => ok, + Err(e) => { + log::error!("Failed to get webhook info: {:?}", e); + return; + } + }; + + let is_webhook_setup = webhook_info.url.is_some(); + + if is_webhook_setup { + if let Err(e) = requester.delete_webhook().send().await { + log::error!("Failed to delete a webhook: {:?}", e); + } + } +} + +#[cfg_attr(doc, aquamarine::aquamarine)] +/// A polling update listener. /// /// ## How it works /// -/// Long polling works by repeatedly calling [`Bot::get_updates`][get_updates]. -/// If telegram has any updates, it returns them immediately, otherwise it waits -/// until either it has any updates or `timeout` expires. +/// Long polling works by repeatedly calling +/// [`Bot::get_updates`][get_updates]. If telegram has any updates, it +/// returns them immediately, otherwise it waits until either it has any +/// updates or `timeout` expires. /// -/// Each [`get_updates`][get_updates] call includes an `offset` parameter equal -/// to the latest update id + one, that allows to only receive updates that has -/// not been received before. +/// Each [`get_updates`][get_updates] call includes an `offset` parameter +/// equal to the latest update id + one, that allows to only receive +/// updates that has not been received before. /// -/// When telegram receives a [`get_updates`][get_updates] request with `offset = -/// N` it forgets any updates with id < `N`. When `polling` listener is stopped, -/// it sends [`get_updates`][get_updates] with `timeout = 0, limit = 1` and -/// appropriate `offset`, so future bot restarts won't see updates that were -/// already seen. +/// When telegram receives a [`get_updates`][get_updates] request with +/// `offset = N` it forgets any updates with id < `N`. When `polling` +/// listener is stopped, it sends [`get_updates`][get_updates] with +/// `timeout = 0, limit = 1` and appropriate `offset`, so future bot +/// restarts won't see updates that were already seen. /// -/// Consumers of a `polling` update listener then need to repeatedly call +/// Consumers of a [`Polling`] update listener then need to repeatedly call /// [`futures::StreamExt::next`] to get the updates. /// -/// Here is an example diagram that shows these interactions between consumers -/// like [`Dispatcher`], `polling` update listener and telegram. +/// Here is an example diagram that shows these interactions between +/// consumers like [`Dispatcher`], [`Polling`] update listener and +/// telegram. /// /// ```mermaid /// sequenceDiagram /// participant C as Consumer -/// participant P as polling +/// participant P as Polling /// participant T as Telegram /// /// link C: Dispatcher @ ../struct.Dispatcher.html @@ -123,131 +234,180 @@ where /// ``` /// /// [get_updates]: crate::requests::Requester::get_updates -pub fn polling( - bot: R, +/// [`Dispatcher`]: crate::dispatching::Dispatcher +#[must_use = "`Polling` is an update listener and does nothing unless used"] +pub struct Polling { + bot: B, timeout: Option, limit: Option, allowed_updates: Option>, -) -> impl UpdateListener + drop_pending_updates: bool, + flag: AsyncStopFlag, + token: AsyncStopToken, +} + +impl Polling where R: Requester + Send + 'static, ::GetUpdates: Send, { - struct State { - bot: B, - timeout: Option, - limit: Option, - allowed_updates: Option>, - offset: i32, - flag: AsyncStopFlag, - token: AsyncStopToken, - force_stop: bool, + /// Returns a builder for polling update listener. + pub fn builder(bot: R) -> PollingBuilder { + PollingBuilder { + bot, + timeout: None, + limit: None, + allowed_updates: None, + drop_pending_updates: false, + } } - - fn stream(st: &mut State) -> impl Stream> + Send + '_ - where - B: Requester + Send, - ::GetUpdates: Send, - { - stream::unfold(st, move |state| async move { - let State { timeout, limit, allowed_updates, bot, offset, flag, force_stop, .. } = - &mut *state; - - if *force_stop { - return None; - } - - if flag.is_stopped() { - let mut req = bot.get_updates().offset(*offset).timeout(0).limit(1); - req.payload_mut().allowed_updates = allowed_updates.take(); - - return match req.send().await { - Ok(_) => None, - Err(err) => { - // Prevents infinite retries, see https://github.com/teloxide/teloxide/issues/496 - *force_stop = true; - - Some((Either::Left(stream::once(ready(Err(err)))), state)) - } - }; - } - - let mut req = bot.get_updates(); - *req.payload_mut() = GetUpdates { - offset: Some(*offset), - timeout: *timeout, - limit: *limit, - allowed_updates: allowed_updates.take(), - }; - - match req.send().await { - Ok(updates) => { - // Set offset to the last update's id + 1 - if let Some(upd) = updates.last() { - *offset = upd.id + 1; - } - - let updates = updates.into_iter().map(Ok); - Some((Either::Right(stream::iter(updates)), state)) - } - Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)), - } - }) - .flatten() - } - - let (token, flag) = AsyncStopToken::new_pair(); - - let state = State { - bot, - timeout: timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")), - limit, - allowed_updates, - offset: 0, - flag, - token, - force_stop: false, - }; - - let stop_token = |st: &mut State<_>| st.token.clone(); - - let hint_allowed_updates = - Some(|state: &mut State<_>, allowed: &mut dyn Iterator| { - // TODO: we should probably warn if there already were different allowed updates - // before - state.allowed_updates = Some(allowed.collect()); - }); - let timeout_hint = Some(move |_: &State<_>| timeout); - - StatefulListener::new_with_hints(state, stream, stop_token, hint_allowed_updates, timeout_hint) } -async fn delete_webhook_if_setup(requester: &R) -where - R: Requester, -{ - let webhook_info = match requester.get_webhook_info().send().await { - Ok(ok) => ok, - Err(e) => { - log::error!("Failed to get webhook info: {:?}", e); - return; - } - }; +#[pin_project::pin_project] +pub struct PollingStream<'a, B: Requester> { + /// Parent structure + polling: &'a mut Polling, - let is_webhook_setup = webhook_info.url.is_some(); + /// Whatever to drop pending updates or not. + drop_pending_updates: bool, - if is_webhook_setup { - if let Err(e) = requester.delete_webhook().send().await { - log::error!("Failed to delete a webhook: {:?}", e); + /// Timeout parameter for normal `get_updates()` calls. + timeout: Option, + /// Allowed updates parameter for the first `get_updates()` call. + allowed_updates: Option>, + /// Offset parameter for normal `get_updates()` calls. + offset: i32, + + /// If this is set, return `None` from `poll_next` immediately. + force_stop: bool, + /// If true we've sent last `get_updates()` call for graceful shutdown. + stopping: bool, + + /// Buffer of updates to be yielded. + buffer: vec::IntoIter, + + /// In-flight `get_updates()` call. + #[pin] + in_flight: Option<::Send>, +} + +impl UpdateListener for Polling { + type StopToken = AsyncStopToken; + + fn stop_token(&mut self) -> Self::StopToken { + self.token.clone() + } + + fn hint_allowed_updates(&mut self, hint: &mut dyn Iterator) { + // TODO: we should probably warn if there already were different allowed updates + // before + self.allowed_updates = Some(hint.collect()); + } + + fn timeout_hint(&self) -> Option { + self.timeout + } +} + +impl<'a, B: Requester + Send + 'a> AsUpdateStream<'a, B::Err> for Polling { + type Stream = PollingStream<'a, B>; + + fn as_stream(&'a mut self) -> Self::Stream { + let timeout = self.timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")); + let allowed_updates = self.allowed_updates.clone(); + let drop_pending_updates = self.drop_pending_updates; + PollingStream { + polling: self, + drop_pending_updates, + timeout, + allowed_updates, + offset: 0, + force_stop: false, + stopping: false, + buffer: Vec::new().into_iter(), + in_flight: None, } } } +impl Stream for PollingStream<'_, B> { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + + if *this.force_stop { + return Ready(None); + } + + // Poll in-flight future until completion + if let Some(in_flight) = this.in_flight.as_mut().as_pin_mut() { + let res = ready!(in_flight.poll(cx)); + this.in_flight.set(None); + + match res { + Ok(_) if *this.stopping => return Ready(None), + Err(err) if *this.stopping => { + // Prevents infinite retries, see https://github.com/teloxide/teloxide/issues/496 + *this.force_stop = true; + + return Ready(Some(Err(err))); + } + Ok(updates) => { + if let Some(upd) = updates.last() { + *this.offset = upd.id + 1; + } + + match *this.drop_pending_updates { + false => *this.buffer = updates.into_iter(), + true => *this.drop_pending_updates = false, + } + } + Err(err) => return Ready(Some(Err(err))), + } + } + + // If there are any buffered updates, return one + if let Some(upd) = this.buffer.next() { + return Ready(Some(Ok(upd))); + } + + *this.stopping = this.polling.flag.is_stopped(); + let (offset, limit, timeout) = match (this.stopping, this.drop_pending_updates) { + // Normal `get_updates()` call + (false, false) => (*this.offset, this.polling.limit, *this.timeout), + // Graceful shutdown `get_updates()` call (shutdown takes priority over dropping pending + // updates) + // + // When stopping we set `timeout = 0` and `limit = 1` so that `get_updates()` + // set last seen update (offset) and return immediately + (true, _) => (*this.offset, Some(1), Some(0)), + // Drop pending updates + (_, true) => (-1, Some(1), Some(0)), + }; + + let req = this + .polling + .bot + .get_updates() + .with_payload_mut(|pay| { + pay.offset = Some(offset); + pay.timeout = timeout; + pay.limit = limit; + pay.allowed_updates = this.allowed_updates.take(); + }) + .send(); + this.in_flight.set(Some(req)); + + // Recurse to poll `self.in_flight` + self.poll_next(cx) + } +} + #[test] fn polling_is_send() { - use crate::dispatching::update_listeners::AsUpdateStream; - let bot = crate::Bot::new("TOKEN"); + #[allow(deprecated)] let mut polling = polling(bot, None, None, None); assert_send(&polling);