diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index d74d36cf..1d9652bb 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -96,33 +96,30 @@ //! //! [`UpdateListener`]: UpdateListener //! [`polling_default`]: polling_default -//! [`polling`]: polling +//! [`polling`]: polling() //! [`Box::get_updates`]: crate::requests::Requester::get_updates //! [getting updates]: https://core.telegram.org/bots/api#getting-updates //! [long]: https://en.wikipedia.org/wiki/Push_technology#Long_polling //! [short]: https://en.wikipedia.org/wiki/Polling_(computer_science) //! [webhook]: https://en.wikipedia.org/wiki/Webhook -use futures::{ - future::{ready, Either}, - stream, Stream, StreamExt, -}; +use futures::Stream; -use std::{convert::TryInto, time::Duration}; -use teloxide_core::{ - payloads::GetUpdates, - requests::{HasPayload, Request, Requester}, - types::{AllowedUpdate, SemiparsedVec, Update}, -}; +use std::time::Duration; -use crate::dispatching::stop_token::{AsyncStopFlag, AsyncStopToken, StopToken}; +use crate::{dispatching::stop_token::StopToken, types::Update}; + +mod polling; +mod stateful_listener; + +pub use self::polling::{polling, polling_default}; /// An update listener. /// /// Implementors of this trait allow getting updates from Telegram. /// /// Currently Telegram has 2 ways of getting updates -- [polling] and -/// [webhooks]. Currently, only the former one is implemented (see [`polling`] +/// [webhooks]. Currently, only the former one is implemented (see [`polling()`] /// and [`polling_default`]) /// /// Some functions of this trait are located in the supertrait @@ -156,7 +153,7 @@ pub trait UpdateListener<E>: for<'a> AsUpdateStream<'a, E> { /// Timeout duration hint. /// /// This hints how often dispatcher should check for shutdown. E.g. for - /// [`polling`] this returns the [`timeout`]. + /// [`polling()`] this returns the [`timeout`]. /// /// [`timeout`]: crate::payloads::GetUpdates::timeout /// @@ -179,214 +176,3 @@ pub trait AsUpdateStream<'a, E> { /// [`Stream`]: AsUpdateStream::Stream fn as_stream(&'a mut self) -> Self::Stream; } - -/// Returns a long polling update listener with `timeout` of 10 seconds. -/// -/// See also: [`polling`](polling). -/// -/// ## Notes -/// -/// This function will automatically delete a webhook if it was set up. -pub async fn polling_default<R>(requester: R) -> impl UpdateListener<R::Err> -where - R: Requester + 'static, - <R as Requester>::GetUpdatesFaultTolerant: Send, -{ - delete_webhook_if_setup(&requester).await; - polling(requester, Some(Duration::from_secs(10)), None, None) -} - -/// Returns a long/short polling update listener with some additional options. -/// -/// - `bot`: Using this bot, the returned update listener will receive updates. -/// - `timeout`: A timeout for polling. -/// - `limit`: Limits the number of updates to be retrieved at once. Values -/// between 1—100 are accepted. -/// - `allowed_updates`: A list the types of updates you want to receive. -/// See [`GetUpdates`] for defaults. -/// -/// See also: [`polling_default`](polling_default). -/// -/// [`GetUpdates`]: crate::payloads::GetUpdates -pub fn polling<R>( - requester: R, - timeout: Option<Duration>, - limit: Option<u8>, - allowed_updates: Option<Vec<AllowedUpdate>>, -) -> impl UpdateListener<R::Err> -where - R: Requester + 'static, - <R as Requester>::GetUpdatesFaultTolerant: Send, -{ - struct State<B: Requester> { - bot: B, - timeout: Option<u32>, - limit: Option<u8>, - allowed_updates: Option<Vec<AllowedUpdate>>, - offset: i32, - flag: AsyncStopFlag, - token: AsyncStopToken, - } - - fn stream<B>(st: &mut State<B>) -> impl Stream<Item = Result<Update, B::Err>> + '_ - where - B: Requester, - { - stream::unfold(st, move |state| async move { - let State { timeout, limit, allowed_updates, bot, offset, flag, .. } = &mut *state; - - if flag.is_stopped() { - let mut req = bot.get_updates_fault_tolerant(); - - req.payload_mut().0 = GetUpdates { - offset: Some(*offset), - timeout: Some(0), - limit: Some(1), - allowed_updates: allowed_updates.take(), - }; - - return match req.send().await { - Ok(_) => None, - Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)), - }; - } - - let mut req = bot.get_updates_fault_tolerant(); - req.payload_mut().0 = GetUpdates { - offset: Some(*offset), - timeout: *timeout, - limit: *limit, - allowed_updates: allowed_updates.take(), - }; - - let updates = match req.send().await { - Err(err) => return Some((Either::Left(stream::once(ready(Err(err)))), state)), - Ok(SemiparsedVec(updates)) => { - // Set offset to the last update's id + 1 - if let Some(upd) = updates.last() { - let id: i32 = match upd { - Ok(ok) => ok.id, - Err((value, _)) => value["update_id"] - .as_i64() - .expect("The 'update_id' field must always exist in Update") - .try_into() - .expect("update_id must be i32"), - }; - - *offset = id + 1; - } - - for update in &updates { - if let Err((value, e)) = update { - log::error!( - "Cannot parse an update.\nError: {:?}\nValue: {}\n\ - This is a bug in teloxide-core, please open an issue here: \ - https://github.com/teloxide/teloxide-core/issues.", - e, - value - ); - } - } - - updates.into_iter().filter_map(Result::ok).map(Ok) - } - }; - - Some((Either::Right(stream::iter(updates)), state)) - }) - .flatten() - } - - let (token, flag) = AsyncStopToken::new_pair(); - - let state = State { - bot: requester, - timeout: timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")), - limit, - allowed_updates, - offset: 0, - flag, - token, - }; - - let stop = |st: &mut State<_>| st.token.clone(); - - let timeout_hint = Some(move |_: &State<_>| timeout); - - StatefulListener { state, stream, stop, timeout_hint } -} - -async fn delete_webhook_if_setup<R>(requester: &R) -where - R: Requester, -{ - let webhook_info = match requester.get_webhook_info().send().await { - Ok(ok) => ok, - Err(e) => { - log::error!("Failed to get webhook info: {:?}", e); - return; - } - }; - - let is_webhook_setup = !webhook_info.url.is_empty(); - - if is_webhook_setup { - if let Err(e) = requester.delete_webhook().send().await { - log::error!("Failed to delete a webhook: {:?}", e); - } - } -} - -/// A listener created from `state` and `stream`/`stop` functions. -struct StatefulListener<St, Assf, Sf, Thf> { - /// The state of the listener. - state: St, - - /// Function used as `AsUpdateStream::as_stream`. - /// - /// Must be of type `for<'a> &'a mut St -> impl Stream + 'a` and callable by - /// `&mut`. - stream: Assf, - - /// Function used as `UpdateListener::stop`. - /// - /// Must be of type `for<'a> &'a mut St -> impl StopToken`. - stop: Sf, - - /// Function used as `UpdateListener::timeout_hint`. - /// - /// Must be of type `for<'a> &'a St -> Option<Duration>` and callable by - /// `&`. - timeout_hint: Option<Thf>, -} - -impl<'a, St, Assf, Sf, Thf, Strm, E> AsUpdateStream<'a, E> for StatefulListener<St, Assf, Sf, Thf> -where - (St, Strm): 'a, - Assf: FnMut(&'a mut St) -> Strm, - Strm: Stream<Item = Result<Update, E>>, -{ - type Stream = Strm; - - fn as_stream(&'a mut self) -> Self::Stream { - (self.stream)(&mut self.state) - } -} - -impl<St, Assf, Sf, Stt, Thf, E> UpdateListener<E> for StatefulListener<St, Assf, Sf, Thf> -where - Self: for<'a> AsUpdateStream<'a, E>, - Sf: FnMut(&mut St) -> Stt, - Stt: StopToken, - Thf: Fn(&St) -> Option<Duration>, -{ - type StopToken = Stt; - - fn stop_token(&mut self) -> Stt { - (self.stop)(&mut self.state) - } - - fn timeout_hint(&self) -> Option<Duration> { - self.timeout_hint.as_ref().and_then(|f| f(&self.state)) - } -} diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs new file mode 100644 index 00000000..6d65dc45 --- /dev/null +++ b/src/dispatching/update_listeners/polling.rs @@ -0,0 +1,173 @@ +use std::{convert::TryInto, time::Duration}; + +use futures::{ + future::{ready, Either}, + stream::{self, Stream, StreamExt}, +}; + +use crate::{ + dispatching::{ + stop_token::{AsyncStopFlag, AsyncStopToken}, + update_listeners::{stateful_listener::StatefulListener, UpdateListener}, + }, + payloads::GetUpdates, + requests::{HasPayload, Request, Requester}, + types::{AllowedUpdate, SemiparsedVec, Update}, +}; + +/// Returns a long polling update listener with `timeout` of 10 seconds. +/// +/// See also: [`polling`](polling). +/// +/// ## Notes +/// +/// This function will automatically delete a webhook if it was set up. +pub async fn polling_default<R>(requester: R) -> impl UpdateListener<R::Err> +where + R: Requester + 'static, + <R as Requester>::GetUpdatesFaultTolerant: Send, +{ + delete_webhook_if_setup(&requester).await; + polling(requester, Some(Duration::from_secs(10)), None, None) +} + +/// Returns a long/short polling update listener with some additional options. +/// +/// - `bot`: Using this bot, the returned update listener will receive updates. +/// - `timeout`: A timeout for polling. +/// - `limit`: Limits the number of updates to be retrieved at once. Values +/// between 1—100 are accepted. +/// - `allowed_updates`: A list the types of updates you want to receive. +/// See [`GetUpdates`] for defaults. +/// +/// See also: [`polling_default`](polling_default). +/// +/// [`GetUpdates`]: crate::payloads::GetUpdates +pub fn polling<R>( + requester: R, + timeout: Option<Duration>, + limit: Option<u8>, + allowed_updates: Option<Vec<AllowedUpdate>>, +) -> impl UpdateListener<R::Err> +where + R: Requester + 'static, + <R as Requester>::GetUpdatesFaultTolerant: Send, +{ + struct State<B: Requester> { + bot: B, + timeout: Option<u32>, + limit: Option<u8>, + allowed_updates: Option<Vec<AllowedUpdate>>, + offset: i32, + flag: AsyncStopFlag, + token: AsyncStopToken, + } + + fn stream<B>(st: &mut State<B>) -> impl Stream<Item = Result<Update, B::Err>> + '_ + where + B: Requester, + { + stream::unfold(st, move |state| async move { + let State { timeout, limit, allowed_updates, bot, offset, flag, .. } = &mut *state; + + if flag.is_stopped() { + let mut req = bot.get_updates_fault_tolerant(); + + req.payload_mut().0 = GetUpdates { + offset: Some(*offset), + timeout: Some(0), + limit: Some(1), + allowed_updates: allowed_updates.take(), + }; + + return match req.send().await { + Ok(_) => None, + Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)), + }; + } + + let mut req = bot.get_updates_fault_tolerant(); + req.payload_mut().0 = GetUpdates { + offset: Some(*offset), + timeout: *timeout, + limit: *limit, + allowed_updates: allowed_updates.take(), + }; + + let updates = match req.send().await { + Err(err) => return Some((Either::Left(stream::once(ready(Err(err)))), state)), + Ok(SemiparsedVec(updates)) => { + // Set offset to the last update's id + 1 + if let Some(upd) = updates.last() { + let id: i32 = match upd { + Ok(ok) => ok.id, + Err((value, _)) => value["update_id"] + .as_i64() + .expect("The 'update_id' field must always exist in Update") + .try_into() + .expect("update_id must be i32"), + }; + + *offset = id + 1; + } + + for update in &updates { + if let Err((value, e)) = update { + log::error!( + "Cannot parse an update.\nError: {:?}\nValue: {}\n\ + This is a bug in teloxide-core, please open an issue here: \ + https://github.com/teloxide/teloxide-core/issues.", + e, + value + ); + } + } + + updates.into_iter().filter_map(Result::ok).map(Ok) + } + }; + + Some((Either::Right(stream::iter(updates)), state)) + }) + .flatten() + } + + let (token, flag) = AsyncStopToken::new_pair(); + + let state = State { + bot: requester, + timeout: timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")), + limit, + allowed_updates, + offset: 0, + flag, + token, + }; + + let stop = |st: &mut State<_>| st.token.clone(); + + let timeout_hint = Some(move |_: &State<_>| timeout); + + StatefulListener { state, stream, stop, timeout_hint } +} + +async fn delete_webhook_if_setup<R>(requester: &R) +where + R: Requester, +{ + let webhook_info = match requester.get_webhook_info().send().await { + Ok(ok) => ok, + Err(e) => { + log::error!("Failed to get webhook info: {:?}", e); + return; + } + }; + + let is_webhook_setup = !webhook_info.url.is_empty(); + + if is_webhook_setup { + if let Err(e) = requester.delete_webhook().send().await { + log::error!("Failed to delete a webhook: {:?}", e); + } + } +} diff --git a/src/dispatching/update_listeners/stateful_listener.rs b/src/dispatching/update_listeners/stateful_listener.rs new file mode 100644 index 00000000..8d6b2913 --- /dev/null +++ b/src/dispatching/update_listeners/stateful_listener.rs @@ -0,0 +1,63 @@ +use std::time::Duration; + +use futures::Stream; +use teloxide_core::types::Update; + +use crate::dispatching::{ + stop_token::StopToken, + update_listeners::{AsUpdateStream, UpdateListener}, +}; + +/// A listener created from `state` and `stream`/`stop` functions. +pub(crate) struct StatefulListener<St, Assf, Sf, Thf> { + /// The state of the listener. + pub(crate) state: St, + + /// Function used as `AsUpdateStream::as_stream`. + /// + /// Must be of type `for<'a> &'a mut St -> impl Stream + 'a` and callable by + /// `&mut`. + pub(crate) stream: Assf, + + /// Function used as `UpdateListener::stop`. + /// + /// Must be of type `for<'a> &'a mut St -> impl StopToken`. + pub(crate) stop: Sf, + + /// Function used as `UpdateListener::timeout_hint`. + /// + /// Must be of type `for<'a> &'a St -> Option<Duration>` and callable by + /// `&`. + pub(crate) timeout_hint: Option<Thf>, +} + +impl<'a, St, Assf, Sf, Thf, Strm, E> AsUpdateStream<'a, E> for StatefulListener<St, Assf, Sf, Thf> +where + (St, Strm): 'a, + Assf: FnMut(&'a mut St) -> Strm, + Strm: Stream<Item = Result<Update, E>>, +{ + type Stream = Strm; + + fn as_stream(&'a mut self) -> Self::Stream { + (self.stream)(&mut self.state) + } +} + +impl<St, Assf, Sf, Stt, Thf, E> UpdateListener<E> for StatefulListener<St, Assf, Sf, Thf> +where + Self: for<'a> AsUpdateStream<'a, E>, + Sf: FnMut(&mut St) -> Stt, + Stt: StopToken, + Thf: Fn(&St) -> Option<Duration>, +{ + type StopToken = Stt; + + fn stop_token(&mut self) -> Stt { + (self.stop)(&mut self.state) + } + + fn timeout_hint(&self) -> Option<Duration> { + self.timeout_hint.as_ref().and_then(|f| f(&self.state)) + } +}