diff --git a/README.md b/README.md index e8c25397..ab3049a1 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,6 @@ $ rustup update stable [dependencies] teloxide = "0.1.0" log = "0.4.8" -futures = "0.3.4" tokio = "0.2.11" pretty_env_logger = "0.4.0" ``` @@ -147,16 +146,15 @@ async fn answer( } async fn handle_command(rx: DispatcherHandlerRx) { - rx.filter_map(|cx| { - future::ready(cx.update.text_owned().map(|text| (cx, text))) - }) - .filter_map(|(cx, text)| { - future::ready(Command::parse(&text).map(|(command, _)| (cx, command))) - }) - .for_each_concurrent(None, |(cx, command)| async move { - answer(cx, command).await.log_on_error().await; - }) - .await; + // Only iterate through text messages: + rx.text_messages() + // Only iterate through commands in a proper format: + .commands::() + // Execute all incoming commands concurrently: + .for_each_concurrent(None, |(cx, command, _)| async move { + answer(cx, command).await.log_on_error().await; + }) + .await; } #[tokio::main] @@ -184,7 +182,7 @@ TELOXIDE_TOKEN= cargo run -See? The dispatcher gives us a stream of messages, so we can handle it as we want! Here we use [`.filter_map()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.filter_map) and [`.for_each_concurrent()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.for_each_concurrent), but others are also available: +See? The dispatcher gives us a stream of messages, so we can handle it as we want! Here we use our `.text_messages()`, `.commands()`, and [`.for_each_concurrent()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.for_each_concurrent), but others are also available: - [`.flatten()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.flatten) - [`.left_stream()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.left_stream) - [`.scan()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.scan) diff --git a/examples/admin_bot/src/main.rs b/examples/admin_bot/src/main.rs index 0a5aebae..dd095935 100644 --- a/examples/admin_bot/src/main.rs +++ b/examples/admin_bot/src/main.rs @@ -176,21 +176,13 @@ async fn action( // Handle all messages. async fn handle_commands(rx: DispatcherHandlerRx) { - rx.filter(|cx| future::ready(!cx.update.chat.is_group())) - .filter_map(|cx| { - future::ready(cx.update.text_owned().map(|text| (cx, text))) - }) - .filter_map(|(cx, text)| { - future::ready(Command::parse(&text).map(|(command, args)| { - ( - cx, - command, - args.into_iter() - .map(ToOwned::to_owned) - .collect::>(), - ) - })) - }) + // Only iterate through messages from groups: + rx.filter(|cx| future::ready(cx.update.chat.is_group())) + // Only iterate through text messages: + .text_messages() + // Only iterate through commands in a proper format: + .commands::() + // Execute all incoming commands concurrently: .for_each_concurrent(None, |(cx, command, args)| async move { action(cx, command, &args).await.log_on_error().await; }) diff --git a/examples/simple_commands_bot/src/main.rs b/examples/simple_commands_bot/src/main.rs index aefd8683..802d23b8 100644 --- a/examples/simple_commands_bot/src/main.rs +++ b/examples/simple_commands_bot/src/main.rs @@ -1,6 +1,5 @@ use teloxide::{prelude::*, utils::command::BotCommand}; -use futures::future; use rand::{thread_rng, Rng}; #[derive(BotCommand)] @@ -32,16 +31,15 @@ async fn answer( } async fn handle_command(rx: DispatcherHandlerRx) { - rx.filter_map(|cx| { - future::ready(cx.update.text_owned().map(|text| (cx, text))) - }) - .filter_map(|(cx, text)| { - future::ready(Command::parse(&text).map(|(command, _)| (cx, command))) - }) - .for_each_concurrent(None, |(cx, command)| async move { - answer(cx, command).await.log_on_error().await; - }) - .await; + // Only iterate through text messages: + rx.text_messages() + // Only iterate through commands in a proper format: + .commands::() + // Execute all incoming commands concurrently: + .for_each_concurrent(None, |(cx, command, _)| async move { + answer(cx, command).await.log_on_error().await; + }) + .await; } #[tokio::main] diff --git a/src/dispatching/dispatcher_handler_rx_ext.rs b/src/dispatching/dispatcher_handler_rx_ext.rs new file mode 100644 index 00000000..c241ab9d --- /dev/null +++ b/src/dispatching/dispatcher_handler_rx_ext.rs @@ -0,0 +1,61 @@ +use crate::{ + prelude::DispatcherHandlerCx, types::Message, utils::command::BotCommand, +}; +use futures::{stream::BoxStream, Stream, StreamExt}; + +/// An extension trait to be used with [`DispatcherHandlerRx`]. +/// +/// [`DispatcherHandlerRx`]: crate:dispatching::DispatcherHandlerRx +pub trait DispatcherHandlerRxExt { + /// Extracts only text messages from this stream. + fn text_messages( + self, + ) -> BoxStream<'static, (DispatcherHandlerCx, String)> + where + Self: Stream>; + + /// Extracts only commands with their arguments from this stream of text + /// messages. + fn commands( + self, + ) -> BoxStream<'static, (DispatcherHandlerCx, C, Vec)> + where + Self: Stream, String)>, + C: BotCommand; +} + +impl DispatcherHandlerRxExt for T +where + T: Send + 'static, +{ + fn text_messages( + self, + ) -> BoxStream<'static, (DispatcherHandlerCx, String)> + where + Self: Stream>, + { + Box::pin(self.filter_map(|cx| async move { + cx.update.text_owned().map(|text| (cx, text)) + })) + } + + fn commands( + self, + ) -> BoxStream<'static, (DispatcherHandlerCx, C, Vec)> + where + Self: Stream, String)>, + C: BotCommand, + { + Box::pin(self.filter_map(|(cx, text)| async move { + C::parse(&text).map(|(command, args)| { + ( + cx, + command, + args.into_iter() + .map(ToOwned::to_owned) + .collect::>(), + ) + }) + })) + } +} diff --git a/src/dispatching/mod.rs b/src/dispatching/mod.rs index 14eb610a..e00bf008 100644 --- a/src/dispatching/mod.rs +++ b/src/dispatching/mod.rs @@ -81,11 +81,13 @@ pub mod dialogue; mod dispatcher; mod dispatcher_handler; mod dispatcher_handler_cx; +mod dispatcher_handler_rx_ext; pub mod update_listeners; pub use dispatcher::Dispatcher; pub use dispatcher_handler::DispatcherHandler; pub use dispatcher_handler_cx::DispatcherHandlerCx; +pub use dispatcher_handler_rx_ext::DispatcherHandlerRxExt; use tokio::sync::mpsc::UnboundedReceiver; /// A type of a stream, consumed by [`Dispatcher`]'s handlers. diff --git a/src/lib.rs b/src/lib.rs index cdbdf522..92f09987 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,7 +42,6 @@ //! [dependencies] //! teloxide = "0.1.0" //! log = "0.4.8" -//! futures = "0.3.4" //! tokio = "0.2.11" //! pretty_env_logger = "0.4.0" //! ``` @@ -99,7 +98,6 @@ //! ```no_run //! // Imports are omitted... //! # use teloxide::{prelude::*, utils::command::BotCommand}; -//! # use futures::future; //! # use rand::{thread_rng, Rng}; //! //! #[derive(BotCommand)] @@ -134,18 +132,15 @@ //! } //! //! async fn handle_command(rx: DispatcherHandlerRx) { -//! rx.filter_map(|cx| { -//! future::ready(cx.update.text_owned().map(|text| (cx, text))) -//! }) -//! .filter_map(|(cx, text)| { -//! future::ready( -//! Command::parse(&text).map(|(command, _)| (cx, command)), -//! ) -//! }) -//! .for_each_concurrent(None, |(cx, command)| async move { -//! answer(cx, command).await.log_on_error().await; -//! }) -//! .await; +//! // Only iterate through text messages: +//! rx.text_messages() +//! // Only iterate through commands in a proper format: +//! .commands::() +//! // Execute all incoming commands concurrently: +//! .for_each_concurrent(None, |(cx, command, _)| async move { +//! answer(cx, command).await.log_on_error().await; +//! }) +//! .await; //! } //! //! #[tokio::main] @@ -178,8 +173,8 @@ //! //! //! See? The dispatcher gives us a stream of messages, so we can handle it as we -//! want! Here we use [`.filter_map()`] and [`.for_each_concurrent()`], but -//! others are also available: +//! want! Here we use our `.text_messages()`, `.commands()`, and +//! [`.for_each_concurrent()`], but others are also available: //! - [`.flatten()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.flatten) //! - [`.left_stream()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.left_stream) //! - [`.scan()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.scan) @@ -349,7 +344,6 @@ //! [@Botfather]: https://t.me/botfather //! [streams]: https://docs.rs/futures/0.3.4/futures/stream/index.html //! [all 30+ patterns]: https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html -//! [`.filter_map()`]: https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.filter_map //! [`.for_each_concurrent()`]: https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.for_each_concurrent //! [See more examples]: https://github.com/teloxide/teloxide/tree/master/examples //! [category theory]: https://en.wikipedia.org/wiki/Category_theory diff --git a/src/prelude.rs b/src/prelude.rs index 4e79ec5e..318234dc 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -7,6 +7,7 @@ pub use crate::{ DialogueStage, GetChatId, }, Dispatcher, DispatcherHandlerCx, DispatcherHandlerRx, + DispatcherHandlerRxExt, }, error_handlers::{LoggingErrorHandler, OnError}, requests::{Request, ResponseResult},