Add DispatcherHandlerRxExt

This commit is contained in:
Temirkhan Myrzamadi 2020-02-19 15:53:54 +06:00
parent 9d78b7e0a6
commit 46b20f676e
7 changed files with 101 additions and 55 deletions

View file

@ -63,7 +63,6 @@ $ rustup update stable
[dependencies] [dependencies]
teloxide = "0.1.0" teloxide = "0.1.0"
log = "0.4.8" log = "0.4.8"
futures = "0.3.4"
tokio = "0.2.11" tokio = "0.2.11"
pretty_env_logger = "0.4.0" pretty_env_logger = "0.4.0"
``` ```
@ -147,16 +146,15 @@ async fn answer(
} }
async fn handle_command(rx: DispatcherHandlerRx<Message>) { async fn handle_command(rx: DispatcherHandlerRx<Message>) {
rx.filter_map(|cx| { // Only iterate through text messages:
future::ready(cx.update.text_owned().map(|text| (cx, text))) rx.text_messages()
}) // Only iterate through commands in a proper format:
.filter_map(|(cx, text)| { .commands::<Command>()
future::ready(Command::parse(&text).map(|(command, _)| (cx, command))) // Execute all incoming commands concurrently:
}) .for_each_concurrent(None, |(cx, command, _)| async move {
.for_each_concurrent(None, |(cx, command)| async move { answer(cx, command).await.log_on_error().await;
answer(cx, command).await.log_on_error().await; })
}) .await;
.await;
} }
#[tokio::main] #[tokio::main]
@ -184,7 +182,7 @@ TELOXIDE_TOKEN=<Your token here> cargo run
</div> </div>
See? The dispatcher gives us a stream of messages, so we can handle it as we want! Here we use [`.filter_map()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.filter_map) and [`.for_each_concurrent()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.for_each_concurrent), but others are also available: See? The dispatcher gives us a stream of messages, so we can handle it as we want! Here we use our `.text_messages()`, `.commands()`, and [`.for_each_concurrent()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.for_each_concurrent), but others are also available:
- [`.flatten()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.flatten) - [`.flatten()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.flatten)
- [`.left_stream()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.left_stream) - [`.left_stream()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.left_stream)
- [`.scan()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.scan) - [`.scan()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.scan)

View file

@ -176,21 +176,13 @@ async fn action(
// Handle all messages. // Handle all messages.
async fn handle_commands(rx: DispatcherHandlerRx<Message>) { async fn handle_commands(rx: DispatcherHandlerRx<Message>) {
rx.filter(|cx| future::ready(!cx.update.chat.is_group())) // Only iterate through messages from groups:
.filter_map(|cx| { rx.filter(|cx| future::ready(cx.update.chat.is_group()))
future::ready(cx.update.text_owned().map(|text| (cx, text))) // Only iterate through text messages:
}) .text_messages()
.filter_map(|(cx, text)| { // Only iterate through commands in a proper format:
future::ready(Command::parse(&text).map(|(command, args)| { .commands::<Command>()
( // Execute all incoming commands concurrently:
cx,
command,
args.into_iter()
.map(ToOwned::to_owned)
.collect::<Vec<String>>(),
)
}))
})
.for_each_concurrent(None, |(cx, command, args)| async move { .for_each_concurrent(None, |(cx, command, args)| async move {
action(cx, command, &args).await.log_on_error().await; action(cx, command, &args).await.log_on_error().await;
}) })

View file

@ -1,6 +1,5 @@
use teloxide::{prelude::*, utils::command::BotCommand}; use teloxide::{prelude::*, utils::command::BotCommand};
use futures::future;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
#[derive(BotCommand)] #[derive(BotCommand)]
@ -32,16 +31,15 @@ async fn answer(
} }
async fn handle_command(rx: DispatcherHandlerRx<Message>) { async fn handle_command(rx: DispatcherHandlerRx<Message>) {
rx.filter_map(|cx| { // Only iterate through text messages:
future::ready(cx.update.text_owned().map(|text| (cx, text))) rx.text_messages()
}) // Only iterate through commands in a proper format:
.filter_map(|(cx, text)| { .commands::<Command>()
future::ready(Command::parse(&text).map(|(command, _)| (cx, command))) // Execute all incoming commands concurrently:
}) .for_each_concurrent(None, |(cx, command, _)| async move {
.for_each_concurrent(None, |(cx, command)| async move { answer(cx, command).await.log_on_error().await;
answer(cx, command).await.log_on_error().await; })
}) .await;
.await;
} }
#[tokio::main] #[tokio::main]

View file

@ -0,0 +1,61 @@
use crate::{
prelude::DispatcherHandlerCx, types::Message, utils::command::BotCommand,
};
use futures::{stream::BoxStream, Stream, StreamExt};
/// An extension trait to be used with [`DispatcherHandlerRx`].
///
/// [`DispatcherHandlerRx`]: crate:dispatching::DispatcherHandlerRx
pub trait DispatcherHandlerRxExt {
/// Extracts only text messages from this stream.
fn text_messages(
self,
) -> BoxStream<'static, (DispatcherHandlerCx<Message>, String)>
where
Self: Stream<Item = DispatcherHandlerCx<Message>>;
/// Extracts only commands with their arguments from this stream of text
/// messages.
fn commands<C>(
self,
) -> BoxStream<'static, (DispatcherHandlerCx<Message>, C, Vec<String>)>
where
Self: Stream<Item = (DispatcherHandlerCx<Message>, String)>,
C: BotCommand;
}
impl<T> DispatcherHandlerRxExt for T
where
T: Send + 'static,
{
fn text_messages(
self,
) -> BoxStream<'static, (DispatcherHandlerCx<Message>, String)>
where
Self: Stream<Item = DispatcherHandlerCx<Message>>,
{
Box::pin(self.filter_map(|cx| async move {
cx.update.text_owned().map(|text| (cx, text))
}))
}
fn commands<C>(
self,
) -> BoxStream<'static, (DispatcherHandlerCx<Message>, C, Vec<String>)>
where
Self: Stream<Item = (DispatcherHandlerCx<Message>, String)>,
C: BotCommand,
{
Box::pin(self.filter_map(|(cx, text)| async move {
C::parse(&text).map(|(command, args)| {
(
cx,
command,
args.into_iter()
.map(ToOwned::to_owned)
.collect::<Vec<String>>(),
)
})
}))
}
}

View file

@ -81,11 +81,13 @@ pub mod dialogue;
mod dispatcher; mod dispatcher;
mod dispatcher_handler; mod dispatcher_handler;
mod dispatcher_handler_cx; mod dispatcher_handler_cx;
mod dispatcher_handler_rx_ext;
pub mod update_listeners; pub mod update_listeners;
pub use dispatcher::Dispatcher; pub use dispatcher::Dispatcher;
pub use dispatcher_handler::DispatcherHandler; pub use dispatcher_handler::DispatcherHandler;
pub use dispatcher_handler_cx::DispatcherHandlerCx; pub use dispatcher_handler_cx::DispatcherHandlerCx;
pub use dispatcher_handler_rx_ext::DispatcherHandlerRxExt;
use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::mpsc::UnboundedReceiver;
/// A type of a stream, consumed by [`Dispatcher`]'s handlers. /// A type of a stream, consumed by [`Dispatcher`]'s handlers.

View file

@ -42,7 +42,6 @@
//! [dependencies] //! [dependencies]
//! teloxide = "0.1.0" //! teloxide = "0.1.0"
//! log = "0.4.8" //! log = "0.4.8"
//! futures = "0.3.4"
//! tokio = "0.2.11" //! tokio = "0.2.11"
//! pretty_env_logger = "0.4.0" //! pretty_env_logger = "0.4.0"
//! ``` //! ```
@ -99,7 +98,6 @@
//! ```no_run //! ```no_run
//! // Imports are omitted... //! // Imports are omitted...
//! # use teloxide::{prelude::*, utils::command::BotCommand}; //! # use teloxide::{prelude::*, utils::command::BotCommand};
//! # use futures::future;
//! # use rand::{thread_rng, Rng}; //! # use rand::{thread_rng, Rng};
//! //!
//! #[derive(BotCommand)] //! #[derive(BotCommand)]
@ -134,18 +132,15 @@
//! } //! }
//! //!
//! async fn handle_command(rx: DispatcherHandlerRx<Message>) { //! async fn handle_command(rx: DispatcherHandlerRx<Message>) {
//! rx.filter_map(|cx| { //! // Only iterate through text messages:
//! future::ready(cx.update.text_owned().map(|text| (cx, text))) //! rx.text_messages()
//! }) //! // Only iterate through commands in a proper format:
//! .filter_map(|(cx, text)| { //! .commands::<Command>()
//! future::ready( //! // Execute all incoming commands concurrently:
//! Command::parse(&text).map(|(command, _)| (cx, command)), //! .for_each_concurrent(None, |(cx, command, _)| async move {
//! ) //! answer(cx, command).await.log_on_error().await;
//! }) //! })
//! .for_each_concurrent(None, |(cx, command)| async move { //! .await;
//! answer(cx, command).await.log_on_error().await;
//! })
//! .await;
//! } //! }
//! //!
//! #[tokio::main] //! #[tokio::main]
@ -178,8 +173,8 @@
//! //!
//! //!
//! See? The dispatcher gives us a stream of messages, so we can handle it as we //! See? The dispatcher gives us a stream of messages, so we can handle it as we
//! want! Here we use [`.filter_map()`] and [`.for_each_concurrent()`], but //! want! Here we use our `.text_messages()`, `.commands()`, and
//! others are also available: //! [`.for_each_concurrent()`], but others are also available:
//! - [`.flatten()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.flatten) //! - [`.flatten()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.flatten)
//! - [`.left_stream()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.left_stream) //! - [`.left_stream()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.left_stream)
//! - [`.scan()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.scan) //! - [`.scan()`](https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.scan)
@ -349,7 +344,6 @@
//! [@Botfather]: https://t.me/botfather //! [@Botfather]: https://t.me/botfather
//! [streams]: https://docs.rs/futures/0.3.4/futures/stream/index.html //! [streams]: https://docs.rs/futures/0.3.4/futures/stream/index.html
//! [all 30+ patterns]: https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html //! [all 30+ patterns]: https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html
//! [`.filter_map()`]: https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.filter_map
//! [`.for_each_concurrent()`]: https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.for_each_concurrent //! [`.for_each_concurrent()`]: https://docs.rs/futures/0.3.4/futures/stream/trait.StreamExt.html#method.for_each_concurrent
//! [See more examples]: https://github.com/teloxide/teloxide/tree/master/examples //! [See more examples]: https://github.com/teloxide/teloxide/tree/master/examples
//! [category theory]: https://en.wikipedia.org/wiki/Category_theory //! [category theory]: https://en.wikipedia.org/wiki/Category_theory

View file

@ -7,6 +7,7 @@ pub use crate::{
DialogueStage, GetChatId, DialogueStage, GetChatId,
}, },
Dispatcher, DispatcherHandlerCx, DispatcherHandlerRx, Dispatcher, DispatcherHandlerCx, DispatcherHandlerRx,
DispatcherHandlerRxExt,
}, },
error_handlers::{LoggingErrorHandler, OnError}, error_handlers::{LoggingErrorHandler, OnError},
requests::{Request, ResponseResult}, requests::{Request, ResponseResult},