diff --git a/Cargo.toml b/Cargo.toml index 472bdb86..abc07eaa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ futures-preview = "0.3.0-alpha.19" async-trait = "0.1.13" thiserror = "1.0.2" serde_with_macros = "1.0.1" +either = "1.5.3" [features] default = [] diff --git a/media/FILTER_DP_FLOWCHART.png b/media/FILTER_DP_FLOWCHART.png new file mode 100644 index 00000000..53641075 Binary files /dev/null and b/media/FILTER_DP_FLOWCHART.png differ diff --git a/src/bot/execute.rs b/src/bot/execute.rs index b3c307d5..6368983d 100644 --- a/src/bot/execute.rs +++ b/src/bot/execute.rs @@ -76,10 +76,7 @@ impl Bot { /// # use teloxide::{Bot, requests::payloads::SendAnimation, types::InputFile}; /// # #[tokio::main] async fn main_() { /// let bot = Bot::new("TOKEN"); - /// let payload = SendAnimation::new( - /// 123456, - /// InputFile::Url(String::from("https://example.com")) - /// ); + /// let payload = SendAnimation::new(123456, InputFile::Url(String::from("https://example.com"))); /// bot.execute_multipart(&payload).await; /// # } /// ``` diff --git a/src/dispatching/dispatchers/filter/error_policy.rs b/src/dispatching/dispatchers/filter/error_policy.rs deleted file mode 100644 index 8d40bbb2..00000000 --- a/src/dispatching/dispatchers/filter/error_policy.rs +++ /dev/null @@ -1,119 +0,0 @@ -// Infallible used here instead of `!` to be compatible with rust <1.41 -use std::{convert::Infallible, future::Future, pin::Pin}; - -use async_trait::async_trait; - -/// Implementors of this trait are treated as error-handlers. -#[async_trait] -pub trait ErrorPolicy { - async fn handle_error(&self, error: E) - where - E: 'async_trait; -} - -/// Error policy that silently ignores all errors -/// -/// ## Example -/// ``` -/// # #[tokio::main] -/// # async fn main_() { -/// use teloxide::dispatching::dispatchers::filter::error_policy::{ -/// ErrorPolicy, Ignore, -/// }; -/// -/// Ignore.handle_error(()).await; -/// Ignore.handle_error(404).await; -/// Ignore.handle_error(String::from("error")).await; -/// # } -/// ``` -pub struct Ignore; - -#[async_trait] -impl ErrorPolicy for Ignore -where - E: Send, -{ - async fn handle_error(&self, _: E) - where - E: 'async_trait, - { - } -} - -/// Error policy that silently ignores all errors that can never happen (e.g.: -/// [`!`] or [`Infallible`]) -/// -/// ## Examples -/// ``` -/// # #[tokio::main] -/// # async fn main_() { -/// use std::convert::{TryInto, Infallible}; -/// -/// use teloxide::dispatching::dispatchers::filter::error_policy::{ -/// ErrorPolicy, -/// IgnoreSafe, -/// }; -/// -/// let result: Result = "str".try_into(); -/// match result { -/// Ok(string) => println!("{}", string), -/// Err(inf) => IgnoreSafe.handle_error(inf).await, -/// } -/// -/// IgnoreSafe.handle_error(return).await; // return type of `return` is `!` (aka never) -/// # } -/// ``` -/// -/// ```compile_fail -/// use teloxide::dispatching::dispatchers::filter::error_policy::{ -/// ErrorPolicy, IgnoreSafe, -/// }; -/// -/// IgnoreSafe.handle_error(0); -/// ``` -/// -/// [`!`]: https://doc.rust-lang.org/std/primitive.never.html -/// [`Infallible`]: std::convert::Infallible -pub struct IgnoreSafe; - -#[allow(unreachable_code)] -#[async_trait] -impl ErrorPolicy for IgnoreSafe { - async fn handle_error(&self, _: Infallible) - where - Infallible: 'async_trait, - { - } -} - -/// Implementation of `ErrorPolicy` for `async fn`s -/// -/// ## Example -/// ``` -/// # #[tokio::main] -/// # async fn main_() { -/// use teloxide::dispatching::dispatchers::filter::error_policy::ErrorPolicy; -/// -/// let closure = |e: i32| async move { eprintln!("Error code{}", e) }; -/// -/// closure.handle_error(404).await; -/// # } -/// ``` -impl ErrorPolicy for F -where - F: Fn(E) -> Fut + Sync, - Fut: Future + Send, - E: Send, -{ - fn handle_error<'s, 'async_trait>( - &'s self, - error: E, - ) -> Pin + Send + 'async_trait>> - where - 's: 'async_trait, - Self: 'async_trait, - E: 'async_trait, - { - Box::pin(async move { self(error).await }) - } -} diff --git a/src/dispatching/dispatchers/filter/mod.rs b/src/dispatching/dispatchers/filter/mod.rs deleted file mode 100644 index 89fb093d..00000000 --- a/src/dispatching/dispatchers/filter/mod.rs +++ /dev/null @@ -1,384 +0,0 @@ -use futures::StreamExt; - -use async_trait::async_trait; - -use crate::{ - dispatching::{ - dispatchers::filter::error_policy::ErrorPolicy, filters::Filter, - handler::Handler, updater::Updater, Dispatcher, - }, - types::{ - CallbackQuery, ChosenInlineResult, InlineQuery, Message, Update, - UpdateKind, - }, -}; - -pub mod error_policy; - -struct FilterAndHandler<'a, T, E> { - filter: Box + 'a>, - handler: Box + 'a>, -} - -impl<'a, T, E> FilterAndHandler<'a, T, E> { - fn new(filter: F, handler: H) -> Self - where - F: Filter + 'a, - H: Handler<'a, T, E> + 'a, - { - FilterAndHandler { - filter: Box::new(filter), - handler: Box::new(handler), - } - } -} - -type FiltersAndHandlers<'a, T, E> = Vec>; - -/// Dispatcher that dispatches updates from telegram. -/// -/// This is 'filter' implementation with following limitations: -/// - Error (`E` generic parameter) _must_ implement [`std::fmt::Debug`] -/// - All 'handlers' are boxed -/// - Handler's fututres are also boxed -/// - All errors from [updater] are ignored (TODO: remove this limitation) -/// - All handlers executed in order (this means that in dispatching have 2 -/// upadtes it will first execute some handler into complition with first -/// update and **then** search for handler for second update, this is probably -/// wrong) -/// -/// ## Examples -/// -/// Simplest example: -/// ```no_run -/// # async fn run() { -/// use std::convert::Infallible; -/// -/// use teloxide::{ -/// dispatching::{ -/// dispatchers::filter::{ -/// error_policy::ErrorPolicy, FilterDispatcher, -/// }, -/// updater::polling, -/// }, -/// types::Message, -/// Bot, -/// }; -/// -/// async fn handle_edited_message(mes: Message) { -/// println!("Edited message: {:?}", mes) -/// } -/// -/// let bot = Bot::new("TOKEN"); -/// -/// // create dispatching which handlers can't fail -/// // with error policy that just ignores all errors (that can't ever happen) -/// let mut dp = FilterDispatcher::::new(|_| async {}) -/// // Add 'handler' that will handle all messages sent to the bot -/// .message_handler(true, |mes: Message| async move { -/// println!("New message: {:?}", mes) -/// }) -/// // Add 'handler' that will handle all -/// // messages edited in chat with the bot -/// .edited_message_handler(true, handle_edited_message); -/// -/// // Start dispatching updates from long polling -/// dp.dispatch(polling(&bot)).await; -/// # } -/// ``` -/// -/// [`std::fmt::Debug`]: std::fmt::Debug -/// [updater]: crate::dispatching::updater -pub struct FilterDispatcher<'a, E, Ep> { - message_handlers: FiltersAndHandlers<'a, Message, E>, - edited_message_handlers: FiltersAndHandlers<'a, Message, E>, - channel_post_handlers: FiltersAndHandlers<'a, Message, E>, - edited_channel_post_handlers: FiltersAndHandlers<'a, Message, E>, - inline_query_handlers: FiltersAndHandlers<'a, InlineQuery, E>, - chosen_inline_result_handlers: - FiltersAndHandlers<'a, ChosenInlineResult, E>, - callback_query_handlers: FiltersAndHandlers<'a, CallbackQuery, E>, - error_policy: Ep, -} - -impl<'a, E, Ep> FilterDispatcher<'a, E, Ep> -where - Ep: ErrorPolicy, - E: std::fmt::Debug, // TODO: Is this really necessary? -{ - pub fn new(error_policy: Ep) -> Self { - FilterDispatcher { - message_handlers: Vec::new(), - edited_message_handlers: Vec::new(), - channel_post_handlers: Vec::new(), - edited_channel_post_handlers: Vec::new(), - inline_query_handlers: Vec::new(), - chosen_inline_result_handlers: Vec::new(), - callback_query_handlers: Vec::new(), - error_policy, - } - } - - pub fn message_handler(mut self, filter: F, handler: H) -> Self - where - F: Filter + 'a, - H: Handler<'a, Message, E> + 'a, - { - self.message_handlers - .push(FilterAndHandler::new(filter, handler)); - self - } - - pub fn edited_message_handler(mut self, filter: F, handler: H) -> Self - where - F: Filter + 'a, - H: Handler<'a, Message, E> + 'a, - { - self.edited_message_handlers - .push(FilterAndHandler::new(filter, handler)); - self - } - - pub fn channel_post_handler(mut self, filter: F, handler: H) -> Self - where - F: Filter + 'a, - H: Handler<'a, Message, E> + 'a, - { - self.channel_post_handlers - .push(FilterAndHandler::new(filter, handler)); - self - } - - pub fn edited_channel_post_handler( - mut self, - filter: F, - handler: H, - ) -> Self - where - F: Filter + 'a, - H: Handler<'a, Message, E> + 'a, - { - self.edited_channel_post_handlers - .push(FilterAndHandler::new(filter, handler)); - self - } - - pub fn inline_query_handler(mut self, filter: F, handler: H) -> Self - where - F: Filter + 'a, - H: Handler<'a, InlineQuery, E> + 'a, - { - self.inline_query_handlers - .push(FilterAndHandler::new(filter, handler)); - self - } - - pub fn chosen_inline_result_handler( - mut self, - filter: F, - handler: H, - ) -> Self - where - F: Filter + 'a, - H: Handler<'a, ChosenInlineResult, E> + 'a, - { - self.chosen_inline_result_handlers - .push(FilterAndHandler::new(filter, handler)); - self - } - - pub fn callback_query_handler(mut self, filter: F, handler: H) -> Self - where - F: Filter + 'a, - H: Handler<'a, CallbackQuery, E> + 'a, - { - self.callback_query_handlers - .push(FilterAndHandler::new(filter, handler)); - self - } - - // TODO: Can someone simplify this? - pub async fn dispatch(&mut self, updates: U) - where - U: Updater + 'a, - { - updates - .for_each(|res| { - async { - let Update { kind, id } = match res { - Ok(upd) => upd, - _ => return, // TODO: proper error handling - }; - - log::debug!( - "Handled update#{id:?}: {kind:?}", - id = id, - kind = kind - ); - - match kind { - UpdateKind::Message(mes) => { - self.handle(mes, &self.message_handlers).await - } - UpdateKind::EditedMessage(mes) => { - self.handle(mes, &self.edited_message_handlers) - .await; - } - UpdateKind::ChannelPost(post) => { - self.handle(post, &self.channel_post_handlers) - .await; - } - UpdateKind::EditedChannelPost(post) => { - self.handle( - post, - &self.edited_channel_post_handlers, - ) - .await; - } - UpdateKind::InlineQuery(query) => { - self.handle(query, &self.inline_query_handlers) - .await; - } - UpdateKind::ChosenInlineResult(result) => { - self.handle( - result, - &self.chosen_inline_result_handlers, - ) - .await; - } - UpdateKind::CallbackQuery(callback) => { - self.handle( - callback, - &self.callback_query_handlers, - ) - .await; - } - } - } - }) - .await; - } - - #[allow(clippy::ptr_arg)] // TODO: proper fix - async fn handle( - &self, - update: T, - handlers: &FiltersAndHandlers<'a, T, E>, - ) where - T: std::fmt::Debug, - { - for x in handlers { - if x.filter.test(&update) { - if let Err(err) = x.handler.handle(update).await { - self.error_policy.handle_error(err).await - } - - return; - } - } - - log::warn!("unhandled update {:?}", update); - } -} - -#[async_trait(? Send)] -impl<'a, U, E, Ep> Dispatcher<'a, U> for FilterDispatcher<'a, E, Ep> -where - E: std::fmt::Debug, - U: Updater + 'a, - Ep: ErrorPolicy, -{ - async fn dispatch(&'a mut self, updater: U) { - FilterDispatcher::dispatch(self, updater).await - } -} - -#[cfg(test)] -mod tests { - use std::{ - convert::Infallible, - sync::atomic::{AtomicI32, Ordering}, - }; - - use futures::Stream; - - use crate::{ - dispatching::{ - dispatchers::filter::FilterDispatcher, updater::StreamUpdater, - }, - types::{ - Chat, ChatKind, ForwardKind, MediaKind, Message, MessageKind, - Sender, Update, UpdateKind, User, - }, - }; - - #[tokio::test] - async fn first_handler_executes_1_time() { - let counter = &AtomicI32::new(0); - let counter2 = &AtomicI32::new(0); - - let mut dp = FilterDispatcher::::new(|_| async {}) - .message_handler(true, |_mes: Message| async move { - counter.fetch_add(1, Ordering::SeqCst); - }) - .message_handler(true, |_mes: Message| async move { - counter2.fetch_add(1, Ordering::SeqCst); - Ok::<_, Infallible>(()) - }); - - dp.dispatch(one_message_updater()).await; - - assert_eq!(counter.load(Ordering::SeqCst), 1); - assert_eq!(counter2.load(Ordering::SeqCst), 0); - } - - fn message() -> Message { - Message { - id: 6534, - date: 1_567_898_953, - chat: Chat { - id: 218_485_655, - photo: None, - kind: ChatKind::Private { - type_: (), - first_name: Some("W".to_string()), - last_name: None, - username: Some("WaffleLapkin".to_string()), - }, - }, - kind: MessageKind::Common { - from: Sender::User(User { - id: 457_569_668, - is_bot: true, - first_name: "BT".to_string(), - last_name: None, - username: Some("BloodyTestBot".to_string()), - language_code: None, - }), - forward_kind: ForwardKind::Origin { - reply_to_message: None, - }, - edit_date: None, - media_kind: MediaKind::Text { - text: "text".to_string(), - entities: vec![], - }, - reply_markup: None, - }, - } - } - - fn message_update() -> Update { - Update { - id: 0, - kind: UpdateKind::Message(message()), - } - } - - fn one_message_updater( - ) -> StreamUpdater>> { - use futures::{future::ready, stream}; - - StreamUpdater::new(stream::once(ready(Ok(message_update())))) - } -} diff --git a/src/dispatching/dispatchers/mod.rs b/src/dispatching/dispatchers/mod.rs deleted file mode 100644 index 1ec549e1..00000000 --- a/src/dispatching/dispatchers/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub use filter::FilterDispatcher; - -pub mod filter; diff --git a/src/dispatching/error_handlers.rs b/src/dispatching/error_handlers.rs new file mode 100644 index 00000000..acf8abf9 --- /dev/null +++ b/src/dispatching/error_handlers.rs @@ -0,0 +1,149 @@ +//! Error handlers. + +// Infallible used here instead of `!` to be compatible with rust <1.41. +use std::{convert::Infallible, fmt::Debug, future::Future, pin::Pin}; + +/// An asynchronous handler of an error. +pub trait ErrorHandler { + #[must_use] + fn handle_error<'a>( + &'a self, + error: E, + ) -> Pin + 'a>> + where + E: 'a; +} + +/// A handler that silently ignores all errors. +/// +/// ## Example +/// ``` +/// # #[tokio::main] +/// # async fn main_() { +/// use teloxide::dispatching::error_handlers::{ErrorHandler, Ignore}; +/// +/// Ignore.handle_error(()).await; +/// Ignore.handle_error(404).await; +/// Ignore.handle_error(String::from("error")).await; +/// # } +/// ``` +pub struct Ignore; + +impl ErrorHandler for Ignore { + #[must_use] + fn handle_error<'a>( + &'a self, + _: E, + ) -> Pin + 'a>> + where + E: 'a, + { + Box::pin(async {}) + } +} + +/// An error handler that silently ignores all errors that can never happen +/// (e.g.: [`!`] or [`Infallible`]). +/// +/// ## Examples +/// ``` +/// # #[tokio::main] +/// # async fn main_() { +/// use std::convert::{Infallible, TryInto}; +/// +/// use teloxide::dispatching::error_handlers::{ErrorHandler, IgnoreSafe}; +/// +/// let result: Result = "str".try_into(); +/// match result { +/// Ok(string) => println!("{}", string), +/// Err(inf) => IgnoreSafe.handle_error(inf).await, +/// } +/// +/// IgnoreSafe.handle_error(return).await; // return type of `return` is `!` (aka never) +/// # } +/// ``` +/// +/// ```compile_fail +/// use teloxide::dispatching::dispatchers::filter::error_policy::{ +/// ErrorPolicy, IgnoreSafe, +/// }; +/// +/// IgnoreSafe.handle_error(0); +/// ``` +/// +/// [`!`]: https://doc.rust-lang.org/std/primitive.never.html +/// [`Infallible`]: std::convert::Infallible +pub struct IgnoreSafe; + +#[allow(unreachable_code)] +impl ErrorHandler for IgnoreSafe { + fn handle_error<'a>( + &'a self, + _: Infallible, + ) -> Pin + 'a>> + where + Infallible: 'a, + { + Box::pin(async {}) + } +} + +/// An error handler that prints all errors passed into it. +/// +/// ## Example +/// ``` +/// # #[tokio::main] +/// # async fn main_() { +/// use teloxide::dispatching::error_handlers::{ErrorHandler, Print}; +/// +/// Print.handle_error(()).await; +/// Print.handle_error(404).await; +/// Print.handle_error(String::from("error")).await; +/// # } +/// ``` +pub struct Print; + +impl ErrorHandler for Print +where + E: Debug, +{ + fn handle_error<'a>( + &'a self, + error: E, + ) -> Pin + 'a>> + where + E: 'a, + { + log::debug!("error: {:?}", error); + Box::pin(async {}) + } +} + +/// The implementation of `ErrorHandler` for `Fn(error) -> Future`. +/// +/// ## Example +/// ``` +/// # #[tokio::main] +/// # async fn main_() { +/// use teloxide::dispatching::error_handlers::ErrorHandler; +/// +/// let mut closure = |e: i32| async move { eprintln!("Error code{}", e) }; +/// +/// closure.handle_error(404).await; +/// # } +/// ``` +impl ErrorHandler for F +where + F: Fn(E) -> Fut, + Fut: Future, +{ + fn handle_error<'a>( + &'a self, + error: E, + ) -> Pin + 'a>> + where + E: 'a, + { + Box::pin(async move { self(error).await }) + } +} diff --git a/src/dispatching/filter_dp.rs b/src/dispatching/filter_dp.rs new file mode 100644 index 00000000..f7d38bc8 --- /dev/null +++ b/src/dispatching/filter_dp.rs @@ -0,0 +1,372 @@ +//! A dispatcher based on filters. + +use futures::StreamExt; + +use crate::{ + dispatching::{filters::Filter, ErrorHandler, Handler, Updater}, + types::{ + CallbackQuery, ChosenInlineResult, InlineQuery, Message, Update, + UpdateKind, + }, +}; +use either::Either; + +type FilterWithHandler<'a, T, E> = + (Box + 'a>, Box + 'a>); +type FiltersWithHandlers<'a, T, E> = Vec>; + +/// A dispatcher based on filters. +/// +/// It consists of: +/// 1. [`ErrorHandler`] than handles errors both from [`Updater`] and +/// [`Handler`]. +/// 2. Filters and handlers. +/// +/// First you register filters and handlers using the methods defined below, and +/// then you call [`.dispatch(updater)`]. Filters and handlers are executed in +/// order of registering. The following flowchart represents how this dispatcher +/// acts: +/// +///
+/// +///
+/// +/// ## Examples +/// +/// The simplest example: +/// ```no_run +/// # async fn run() { +/// use std::convert::Infallible; +/// +/// use teloxide::{ +/// dispatching::{updaters::polling_default, FilterDispatcher}, +/// types::Message, +/// Bot, +/// }; +/// +/// async fn handle_edited_message(mes: Message) -> Result<(), Infallible> { +/// println!("Edited message: {:?}", mes); +/// Ok(()) +/// } +/// +/// let bot = Bot::new("TOKEN"); +/// +/// // Create a dispatcher which handlers can't fail with the +/// // error handler that just ignores all errors (that can't ever happen). +/// let mut dp = FilterDispatcher::::new(|_| async {}) +/// // Add a handler, which handles all messages sent to the bot. +/// .message_handler(true, |mes: Message| async move { +/// println!("New message: {:?}", mes); +/// Ok(()) +/// }) +/// // Add a handler, which handles all messages edited in a chat +/// // with the bot. +/// .edited_message_handler(true, handle_edited_message); +/// +/// // Start dispatching updates using long polling. +/// dp.dispatch(polling_default(&bot)).await; +/// # } +/// ``` +/// +/// [`std::fmt::Debug`]: std::fmt::Debug +/// [updater]: crate::dispatching::updater +/// [`.dispatch(updater)`]: FilterDispatcher::dispatch +pub struct FilterDispatcher<'a, E, Eh> { + message_handlers: FiltersWithHandlers<'a, Message, E>, + edited_message_handlers: FiltersWithHandlers<'a, Message, E>, + channel_post_handlers: FiltersWithHandlers<'a, Message, E>, + edited_channel_post_handlers: FiltersWithHandlers<'a, Message, E>, + inline_query_handlers: FiltersWithHandlers<'a, InlineQuery, E>, + chosen_inline_result_handlers: + FiltersWithHandlers<'a, ChosenInlineResult, E>, + callback_query_handlers: FiltersWithHandlers<'a, CallbackQuery, E>, + error_handler: Eh, +} + +impl<'a, HandlerE, Eh> FilterDispatcher<'a, HandlerE, Eh> { + pub fn new(error_handler: Eh) -> Self + where + Eh: ErrorHandler>, + { + FilterDispatcher { + message_handlers: Vec::new(), + edited_message_handlers: Vec::new(), + channel_post_handlers: Vec::new(), + edited_channel_post_handlers: Vec::new(), + inline_query_handlers: Vec::new(), + chosen_inline_result_handlers: Vec::new(), + callback_query_handlers: Vec::new(), + error_handler, + } + } + + pub fn message_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter + 'a, + H: Handler + 'a, + { + self.message_handlers + .push((Box::new(filter), Box::new(handler))); + self + } + + pub fn edited_message_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter + 'a, + H: Handler + 'a, + { + self.edited_message_handlers + .push((Box::new(filter), Box::new(handler))); + self + } + + pub fn channel_post_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter + 'a, + H: Handler + 'a, + { + self.channel_post_handlers + .push((Box::new(filter), Box::new(handler))); + self + } + + pub fn edited_channel_post_handler( + mut self, + filter: F, + handler: H, + ) -> Self + where + F: Filter + 'a, + H: Handler + 'a, + { + self.edited_channel_post_handlers + .push((Box::new(filter), Box::new(handler))); + self + } + + pub fn inline_query_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter + 'a, + H: Handler + 'a, + { + self.inline_query_handlers + .push((Box::new(filter), Box::new(handler))); + self + } + + pub fn chosen_inline_result_handler( + mut self, + filter: F, + handler: H, + ) -> Self + where + F: Filter + 'a, + H: Handler + 'a, + { + self.chosen_inline_result_handlers + .push((Box::new(filter), Box::new(handler))); + self + } + + pub fn callback_query_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter + 'a, + H: Handler + 'a, + { + self.callback_query_handlers + .push((Box::new(filter), Box::new(handler))); + self + } + + pub async fn dispatch(&mut self, updater: U) + where + U: Updater + 'a, + Eh: ErrorHandler>, + { + updater + .for_each_concurrent(None, |res| async { + let Update { kind, id } = match res { + Ok(upd) => upd, + Err(err) => { + self.error_handler + .handle_error(Either::Left(err)) + .await; + return; + } + }; + + log::debug!( + "Handled update#{id:?}: {kind:?}", + id = id, + kind = kind + ); + + match kind { + UpdateKind::Message(mes) => { + Self::handle( + mes, + &self.message_handlers, + &self.error_handler, + ) + .await + } + UpdateKind::EditedMessage(mes) => { + Self::handle( + mes, + &self.edited_message_handlers, + &self.error_handler, + ) + .await; + } + UpdateKind::ChannelPost(post) => { + Self::handle( + post, + &self.channel_post_handlers, + &self.error_handler, + ) + .await; + } + UpdateKind::EditedChannelPost(post) => { + Self::handle( + post, + &self.edited_channel_post_handlers, + &self.error_handler, + ) + .await; + } + UpdateKind::InlineQuery(query) => { + Self::handle( + query, + &self.inline_query_handlers, + &self.error_handler, + ) + .await; + } + UpdateKind::ChosenInlineResult(result) => { + Self::handle( + result, + &self.chosen_inline_result_handlers, + &self.error_handler, + ) + .await; + } + UpdateKind::CallbackQuery(callback) => { + Self::handle( + callback, + &self.callback_query_handlers, + &self.error_handler, + ) + .await; + } + } + }) + .await; + } + + async fn handle( + update: T, + handlers: &FiltersWithHandlers<'a, T, HandlerE>, + error_handler: &Eh, + ) where + T: std::fmt::Debug, + Eh: ErrorHandler>, + { + for x in handlers { + if x.0.test(&update) { + if let Err(err) = x.1.handle(update).await { + error_handler.handle_error(Either::Right(err)).await + } + + return; + } + } + + log::warn!("unhandled update {:?}", update); + } +} + +#[cfg(test)] +mod tests { + use std::{ + convert::Infallible, + sync::atomic::{AtomicI32, Ordering}, + }; + + use crate::{ + dispatching::{FilterDispatcher, Updater}, + types::{ + Chat, ChatKind, ForwardKind, MediaKind, Message, MessageKind, + Sender, Update, UpdateKind, User, + }, + }; + + #[tokio::test] + async fn first_handler_executes_1_time() { + let counter = &AtomicI32::new(0); + let counter2 = &AtomicI32::new(0); + + let mut dp = FilterDispatcher::::new(|_| async {}) + .message_handler(true, |_mes: Message| async move { + counter.fetch_add(1, Ordering::SeqCst); + Ok::<_, Infallible>(()) + }) + .message_handler(true, |_mes: Message| async move { + counter2.fetch_add(1, Ordering::SeqCst); + Ok::<_, Infallible>(()) + }); + + dp.dispatch(one_message_updater()).await; + + assert_eq!(counter.load(Ordering::SeqCst), 1); + assert_eq!(counter2.load(Ordering::SeqCst), 0); + } + + fn message() -> Message { + Message { + id: 6534, + date: 1_567_898_953, + chat: Chat { + id: 218_485_655, + photo: None, + kind: ChatKind::Private { + type_: (), + first_name: Some("W".to_string()), + last_name: None, + username: Some("WaffleLapkin".to_string()), + }, + }, + kind: MessageKind::Common { + from: Sender::User(User { + id: 457_569_668, + is_bot: true, + first_name: "BT".to_string(), + last_name: None, + username: Some("BloodyTestBot".to_string()), + language_code: None, + }), + forward_kind: ForwardKind::Origin { + reply_to_message: None, + }, + edit_date: None, + media_kind: MediaKind::Text { + text: "text".to_string(), + entities: vec![], + }, + reply_markup: None, + }, + } + } + + fn message_update() -> Update { + Update { + id: 0, + kind: UpdateKind::Message(message()), + } + } + + fn one_message_updater() -> impl Updater { + use futures::{future::ready, stream}; + + stream::once(ready(Ok(message_update()))) + } +} diff --git a/src/dispatching/filters/mod.rs b/src/dispatching/filters/mod.rs index b3f9237f..4f8207a8 100644 --- a/src/dispatching/filters/mod.rs +++ b/src/dispatching/filters/mod.rs @@ -1,3 +1,5 @@ +//! Filters of messages. + pub use main::*; pub use command::*; diff --git a/src/dispatching/handler.rs b/src/dispatching/handler.rs index 9ce74580..c0911be1 100644 --- a/src/dispatching/handler.rs +++ b/src/dispatching/handler.rs @@ -1,44 +1,30 @@ use std::{future::Future, pin::Pin}; -use futures::FutureExt; - -pub type HandlerResult = Result<(), E>; - -/// Asynchronous handler for event `T` (like `&self, I -> Future` fn) -pub trait Handler<'a, T, E> { - fn handle( - &self, +/// An asynchronous handler of a value. +pub trait Handler { + #[must_use] + fn handle<'a>( + &'a self, value: T, - ) -> Pin> + 'a>>; + ) -> Pin> + 'a>> + where + T: 'a; } -pub trait IntoHandlerResult { - fn into_hr(self) -> HandlerResult; -} - -impl IntoHandlerResult for () { - fn into_hr(self) -> HandlerResult { - Ok(()) - } -} - -impl IntoHandlerResult for HandlerResult { - fn into_hr(self) -> HandlerResult { - self - } -} - -impl<'a, F, Fut, R, T, E> Handler<'a, T, E> for F +/// The implementation of `Handler` for `Fn(U) -> Future>`. +impl Handler for F where F: Fn(T) -> Fut, - Fut: Future + 'a, - R: IntoHandlerResult + 'a, - E: 'a, + Fut: Future>, { - fn handle( - &self, + fn handle<'a>( + &'a self, value: T, - ) -> Pin> + 'a>> { - Box::pin(self(value).map(IntoHandlerResult::into_hr)) + ) -> Pin + 'a>> + where + T: 'a, + { + Box::pin(async move { self(value).await }) } } diff --git a/src/dispatching/mod.rs b/src/dispatching/mod.rs index c239778f..f738757d 100644 --- a/src/dispatching/mod.rs +++ b/src/dispatching/mod.rs @@ -1,15 +1,13 @@ //! Update dispatching. -use async_trait::async_trait; +pub mod error_handlers; +mod filter_dp; +pub mod filters; +mod handler; +pub mod updaters; + +pub use error_handlers::ErrorHandler; +pub use filter_dp::FilterDispatcher; pub use filters::Filter; pub use handler::Handler; - -pub mod dispatchers; -pub mod filters; -pub mod handler; -pub mod updater; - -#[async_trait(? Send)] -pub trait Dispatcher<'a, U> { - async fn dispatch(&'a mut self, updater: U); -} +pub use updaters::Updater; diff --git a/src/dispatching/updater.rs b/src/dispatching/updater.rs deleted file mode 100644 index fdf4aaf1..00000000 --- a/src/dispatching/updater.rs +++ /dev/null @@ -1,159 +0,0 @@ -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - -use futures::{stream, Stream, StreamExt}; -use pin_project::pin_project; - -use crate::{bot::Bot, types::Update, RequestError}; - -// Currently just a placeholder, but I'll add here some methods -/// Updater is stream of updates. -/// -/// Telegram supports 2 ways of [getting updates]: [long polling](Long Polling) -/// and webhook -/// -/// ## Long Polling -/// -/// In long polling ([wiki]) you just call [GetUpdates] every N seconds. -/// -/// #### Example: -/// -///
-///     tg                           bot
-///      |                            |
-///      |<---------------------------| Updates? (GetUpdates call)
-///      ↑                            ↑
-///      |          timeout^1         |
-///      ↓                            ↓
-/// Nope |--------------------------->|
-///      ↑                            ↑
-///      | delay between GetUpdates^2 |
-///      ↓                            ↓
-///      |<---------------------------| Updates?
-///      ↑                            ↑
-///      |          timeout^3         |
-///      ↓                            ↓
-/// Yes  |-------[updates 0, 1]------>|
-///      ↑                            ↑
-///      |           delay            |
-///      ↓                            ↓
-///      |<-------[offset = 1]--------| Updates?^4
-///      ↑                            ↑
-///      |           timeout          |
-///      ↓                            ↓
-/// Yes  |---------[update 2]-------->|
-///      ↑                            ↑
-///      |           delay            |
-///      ↓                            ↓
-///      |<-------[offset = 2]--------| Updates?
-///      ↑                            ↑
-///      |           timeout          |
-///      ↓                            ↓
-/// Nope |--------------------------->|
-///      ↑                            ↑
-///      |           delay            |
-///      ↓                            ↓
-///      |<-------[offset = 2]--------| Updates?
-///      ↑                            ↑
-///      |           timeout          |
-///      ↓                            ↓
-/// Nope |--------------------------->|
-///      ↑                            ↑
-///      |           delay            |
-///      ↓                            ↓
-///      |<-------[offset = 2]--------| Updates?
-///      ↑                            ↑
-///      |           timeout          |
-///      ↓                            ↓
-/// Yes  |-------[updates 2..5]------>|
-///      ↑                            ↑
-///      |           delay            |
-///      ↓                            ↓
-///      |<-------[offset = 5]--------| Updates?
-///      ↑                            ↑
-///      |           timeout          |
-///      ↓                            ↓
-/// Nope |--------------------------->|
-///      |                            |
-///      ~    and so on, and so on    ~
-/// 
-/// -/// ^1 Timeout can be even 0 -/// (this is also called short polling), -/// but you should use it **only** for testing purposes -/// -/// ^2 Large delays will cause in bot lags, -/// so delay shouldn't exceed second. -/// -/// ^3 Note that if telegram already have updates for -/// you it will answer you **without** waiting for a timeout -/// -/// ^4 `offset = N` means that we've already received -/// updates `0..=N` -/// -/// [GetUpdates]: crate::requests::payloads::GetUpdates -/// [getting updates]: https://core.telegram.org/bots/api#getting-updates -/// [wiki]: https://en.wikipedia.org/wiki/Push_technology#Long_polling -pub trait Updater: - Stream::Error>> -{ - type Error; -} - -#[pin_project] -pub struct StreamUpdater { - #[pin] - stream: S, -} - -impl StreamUpdater { - pub fn new(stream: S) -> Self { - Self { stream } - } -} - -impl Stream for StreamUpdater -where - S: Stream>, -{ - type Item = Result; - - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.project().stream.poll_next(cx) - } -} - -impl Updater for StreamUpdater -where - S: Stream>, -{ - type Error = E; -} - -pub fn polling<'a>(bot: &'a Bot) -> impl Updater + 'a { - let stream = stream::unfold((bot, 0), |(bot, mut offset)| async move { - let updates = match bot.get_updates().offset(offset).send().await { - Ok(updates) => { - if let Some(upd) = updates.last() { - offset = upd.id + 1; - } - updates.into_iter().map(Ok).collect::>() - } - Err(err) => vec![Err(err)], - }; - Some((stream::iter(updates), (bot, offset))) - }) - .flatten(); - - StreamUpdater { stream } -} - -// TODO implement webhook (this actually require webserver and probably we -// should add cargo feature that adds webhook) -//pub fn webhook<'a>(bot: &'a cfg: WebhookConfig) -> Updater> + 'a> {} diff --git a/src/dispatching/updaters.rs b/src/dispatching/updaters.rs new file mode 100644 index 00000000..f7fed6f5 --- /dev/null +++ b/src/dispatching/updaters.rs @@ -0,0 +1,171 @@ +//! Receiving updates from Telegram. +//! +//! The key trait here is [`Updater`]. You can get it by these functions: +//! +//! - [`polling_default`], which returns a default long polling updater. +//! - [`polling`], which returns a long/short polling updater with your +//! configuration. +//! +//! And then you can pass it directly to a dispatcher. +//! +//! Telegram supports two ways of [getting updates]: [long]/[short] polling and +//! [webhook]. +//! +//! # Long Polling +//! +//! In long polling, you just call [`Box::get_updates`] every N seconds. +//! +//! ## Example +//! +//!
+//!     tg                           bot
+//!      |                            |
+//!      |<---------------------------| Updates? (Bot::get_updates call)
+//!      ↑                            ↑
+//!      |          timeout^1         |
+//!      ↓                            ↓
+//! Nope |--------------------------->|
+//!      ↑                            ↑
+//!      | delay between Bot::get_updates^2 |
+//!      ↓                            ↓
+//!      |<---------------------------| Updates?
+//!      ↑                            ↑
+//!      |          timeout^3         |
+//!      ↓                            ↓
+//! Yes  |-------[updates 0, 1]------>|
+//!      ↑                            ↑
+//!      |           delay            |
+//!      ↓                            ↓
+//!      |<-------[offset = 1]--------| Updates?^4
+//!      ↑                            ↑
+//!      |           timeout          |
+//!      ↓                            ↓
+//! Yes  |---------[update 2]-------->|
+//!      ↑                            ↑
+//!      |           delay            |
+//!      ↓                            ↓
+//!      |<-------[offset = 2]--------| Updates?
+//!      ↑                            ↑
+//!      |           timeout          |
+//!      ↓                            ↓
+//! Nope |--------------------------->|
+//!      ↑                            ↑
+//!      |           delay            |
+//!      ↓                            ↓
+//!      |<-------[offset = 2]--------| Updates?
+//!      ↑                            ↑
+//!      |           timeout          |
+//!      ↓                            ↓
+//! Nope |--------------------------->|
+//!      ↑                            ↑
+//!      |           delay            |
+//!      ↓                            ↓
+//!      |<-------[offset = 2]--------| Updates?
+//!      ↑                            ↑
+//!      |           timeout          |
+//!      ↓                            ↓
+//! Yes  |-------[updates 2..5]------>|
+//!      ↑                            ↑
+//!      |           delay            |
+//!      ↓                            ↓
+//!      |<-------[offset = 5]--------| Updates?
+//!      ↑                            ↑
+//!      |           timeout          |
+//!      ↓                            ↓
+//! Nope |--------------------------->|
+//!      |                            |
+//!      ~    and so on, and so on    ~
+//! 
+//! +//! ^1 A timeout can be even 0 +//! (this is also called short polling), +//! but you should use it **only** for testing purposes. +//! +//! ^2 Large delays will cause in bot lags, +//! so delay shouldn't exceed second. +//! +//! ^3 Note that if Telegram already have updates for +//! you it will answer you **without** waiting for a timeout. +//! +//! ^4 `offset = N` means that we've already received +//! updates `0..=N`. +//! +//! [`Updater`]: Updater +//! [`polling_default`]: polling_default +//! [`polling`]: polling +//! [`Dispatcher`]: crate::dispatching::Dispatcher::dispatch +//! [`Box::get_updates`]: crate::Bot::get_updates +//! [getting updates]: https://core.telegram.org/bots/api#getting-updates +//! [long]: https://en.wikipedia.org/wiki/Push_technology#Long_polling +//! [short]: https://en.wikipedia.org/wiki/Polling_(computer_science) +//! [webhook]: https://en.wikipedia.org/wiki/Webhook + +use futures::{stream, Stream, StreamExt}; + +use crate::{ + bot::Bot, requests::payloads::AllowedUpdate, types::Update, RequestError, +}; +use std::{convert::TryInto, time::Duration}; + +/// A generic updater. +pub trait Updater: Stream> { + // TODO: add some methods here (.shutdown(), etc). +} +impl Updater for S where S: Stream> {} + +/// Returns a long polling updater with the default configuration. +/// +/// See also: [`polling`](polling). +pub fn polling_default(bot: &Bot) -> impl Updater + '_ { + polling(bot, None, None, None) +} + +/// Returns a long/short polling updater with some additional options. +/// +/// - `bot`: Using this bot, the returned updater will receive updates. +/// - `timeout`: A timeout for polling. +/// - `limit`: Limits the number of updates to be retrieved at once. Values +/// between 1—100 are accepted. +/// - `allowed_updates`: A list the types of updates you want to receive. +/// See [`GetUpdates`] for defaults. +/// +/// See also: [`polling_default`](polling_default). +/// +/// [`GetUpdates`]: crate::requests::payloads::GetUpdates +pub fn polling( + bot: &Bot, + timeout: Option, + limit: Option, + allowed_updates: Option>, +) -> impl Updater + '_ { + let timeout = + timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")); + + stream::unfold( + (allowed_updates, bot, 0), + move |(mut allowed_updates, bot, mut offset)| async move { + let mut req = bot.get_updates().offset(offset); + req.payload.timeout = timeout; + req.payload.limit = limit; + req.payload.allowed_updates = allowed_updates.take(); + + let updates = match req.send().await { + Err(err) => vec![Err(err)], + Ok(updates) => { + if let Some(upd) = updates.last() { + offset = upd.id + 1; + } + updates.into_iter().map(Ok).collect::>() + } + }; + + Some((stream::iter(updates), (allowed_updates, bot, offset))) + }, + ) + .flatten() +} + +// TODO implement webhook (this actually require webserver and probably we +// should add cargo feature that adds webhook) +//pub fn webhook<'a>(bot: &'a cfg: WebhookConfig) -> Updater> + 'a> {}