diff --git a/Cargo.toml b/Cargo.toml index 977a540c..1f1beac4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,8 @@ maintenance = { status = "actively-developed" } [features] default = ["native-tls", "ctrlc_handler", "teloxide-core/default"] +dispatching2 = ["dptree"] + sqlite-storage = ["sqlx"] redis-storage = ["redis"] cbor-serializer = ["serde_cbor"] @@ -75,7 +77,7 @@ teloxide-macros = { version = "0.4", optional = true } serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } -dptree = { git = "https://github.com/p0lunin/dptree" } +dptree = { git = "https://github.com/p0lunin/dptree", optional = true } tokio = { version = "1.8", features = ["fs"] } tokio-util = "0.6" diff --git a/src/dispatching/dialogue/dialogue_dispatcher_handler.rs b/src/dispatching/dialogue/dialogue_dispatcher_handler.rs index 827809ea..07dc6e00 100644 --- a/src/dispatching/dialogue/dialogue_dispatcher_handler.rs +++ b/src/dispatching/dialogue/dialogue_dispatcher_handler.rs @@ -1,4 +1,4 @@ -use crate::prelude::{DialogueStage, DialogueWithCx}; +use crate::dispatching::dialogue::{DialogueStage, DialogueWithCx}; use futures::future::BoxFuture; use std::{future::Future, sync::Arc}; diff --git a/src/dispatching/mod.rs b/src/dispatching/mod.rs index d96ef0b0..c7b04389 100644 --- a/src/dispatching/mod.rs +++ b/src/dispatching/mod.rs @@ -49,6 +49,7 @@ pub mod dialogue; pub mod stop_token; pub mod update_listeners; +#[cfg(not(feature = "dispatching2"))] pub(crate) mod repls; mod dispatcher; @@ -62,6 +63,7 @@ pub use dispatcher_handler_rx_ext::DispatcherHandlerRxExt; use tokio::sync::mpsc::UnboundedReceiver; pub use update_with_cx::{UpdateWithCx, UpdateWithCxRequesterType}; +#[cfg(feature = "dispatching2")] pub(crate) use dispatcher::{ shutdown_check_timeout_for, shutdown_inner, DispatcherState, ShutdownState, }; diff --git a/src/dispatching2/dispatcher.rs b/src/dispatching2/dispatcher.rs index b43e29fc..b1d34487 100644 --- a/src/dispatching2/dispatcher.rs +++ b/src/dispatching2/dispatcher.rs @@ -13,7 +13,7 @@ use std::{collections::HashSet, convert::Infallible, fmt::Debug, ops::ControlFlo use tokio::{sync::Notify, time::timeout}; pub struct Dispatcher { - requester: R, + requester: Arc, dependencies: DependencyMap, handler: UpdateHandler, @@ -41,10 +41,10 @@ macro_rules! make_parser { impl Dispatcher where - R: 'static, + R: Send + Sync + 'static, Err: Send + Sync + 'static, { - pub fn new(requester: R) -> Self { + pub fn new(requester: Arc) -> Self { Dispatcher { requester, dependencies: DependencyMap::new(), @@ -138,7 +138,6 @@ where UListener: UpdateListener + 'a, Eh: ErrorHandler + 'a, ListenerE: Debug, - R: Requester + Clone, { use crate::dispatching::ShutdownState::*; @@ -202,6 +201,7 @@ where Ok(upd) => { let mut deps = self.dependencies.clone(); deps.insert(upd); + deps.insert_arc(self.requester.clone()); match self.handler.dispatch(deps).await { ControlFlow::Break(Ok(())) => {} ControlFlow::Break(Err(_err)) => todo!("error handler"), @@ -240,7 +240,7 @@ where Dispatcher { dependencies, ..self } } - pub fn message_handler( + pub fn messages_handler( mut self, make_handler: impl FnOnce(UpdateHandler) -> UpdateHandler, ) -> Self { @@ -251,7 +251,7 @@ where self.handler(handler) } - pub fn edited_message_handler( + pub fn edited_messages_handler( mut self, make_handler: impl FnOnce(UpdateHandler) -> UpdateHandler, ) -> Self { diff --git a/src/dispatching2/mod.rs b/src/dispatching2/mod.rs index e89c01c1..be9662d6 100644 --- a/src/dispatching2/mod.rs +++ b/src/dispatching2/mod.rs @@ -1,3 +1,5 @@ +pub(crate) mod repls; + mod dispatcher; pub use dispatcher::Dispatcher; diff --git a/src/dispatching2/repls/commands_repl.rs b/src/dispatching2/repls/commands_repl.rs new file mode 100644 index 00000000..a3625910 --- /dev/null +++ b/src/dispatching2/repls/commands_repl.rs @@ -0,0 +1,98 @@ +use crate::{ + dispatching::{ + update_listeners, update_listeners::UpdateListener, Dispatcher, DispatcherHandlerRx, + DispatcherHandlerRxExt, UpdateWithCx, + }, + error_handlers::{LoggingErrorHandler, OnError}, + utils::command::BotCommand, +}; +use futures::StreamExt; +use std::{fmt::Debug, future::Future, sync::Arc}; +use teloxide_core::{requests::Requester, types::Message}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +/// A [REPL] for commands. +/// +/// All errors from an update listener and handler will be logged. +/// +/// # Caution +/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, +/// because Telegram disallow multiple requests at the same time from the same +/// bot. +/// +/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop +/// [`Dispatcher`]: crate::dispatching::Dispatcher +#[cfg(feature = "ctrlc_handler")] +pub async fn commands_repl(requester: R, bot_name: N, handler: H) +where + Cmd: BotCommand + Send + 'static, + H: Fn(UpdateWithCx, Cmd) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + Result<(), HandlerE>: OnError, + HandlerE: Debug + Send, + N: Into + Send + 'static, + R: Requester + Send + Clone + 'static, + ::GetUpdatesFaultTolerant: Send, +{ + let cloned_requester = requester.clone(); + + commands_repl_with_listener( + requester, + bot_name, + handler, + update_listeners::polling_default(cloned_requester).await, + ) + .await; +} + +/// Like [`commands_repl`], but with a custom [`UpdateListener`]. +/// +/// All errors from an update listener and handler will be logged. +/// +/// # Caution +/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, +/// because Telegram disallow multiple requests at the same time from the same +/// bot. +/// +/// [`Dispatcher`]: crate::dispatching::Dispatcher +/// [`commands_repl`]: crate::dispatching::repls::commands_repl() +/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener +#[cfg(feature = "ctrlc_handler")] +pub async fn commands_repl_with_listener<'a, R, Cmd, H, Fut, L, ListenerE, HandlerE, N>( + requester: R, + bot_name: N, + handler: H, + listener: L, +) where + Cmd: BotCommand + Send + 'static, + H: Fn(UpdateWithCx, Cmd) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + L: UpdateListener + Send + 'a, + ListenerE: Debug + Send + 'a, + Result<(), HandlerE>: OnError, + HandlerE: Debug + Send, + N: Into + Send + 'static, + R: Requester + Clone + Send + 'static, +{ + let handler = Arc::new(handler); + + Dispatcher::::new(requester) + .messages_handler(move |rx: DispatcherHandlerRx| { + UnboundedReceiverStream::new(rx).commands::(bot_name).for_each_concurrent( + None, + move |(cx, cmd)| { + let handler = Arc::clone(&handler); + + async move { + handler(cx, cmd).await.log_on_error().await; + } + }, + ) + }) + .setup_ctrlc_handler() + .dispatch_with_listener( + listener, + LoggingErrorHandler::with_custom_text("An error from the update listener"), + ) + .await +} diff --git a/src/dispatching2/repls/dialogues_repl.rs b/src/dispatching2/repls/dialogues_repl.rs new file mode 100644 index 00000000..755d4dd3 --- /dev/null +++ b/src/dispatching2/repls/dialogues_repl.rs @@ -0,0 +1,96 @@ +use crate::{ + dispatching::{ + dialogue::{DialogueDispatcher, DialogueStage, DialogueWithCx, InMemStorageError}, + update_listeners, + update_listeners::UpdateListener, + Dispatcher, UpdateWithCx, + }, + error_handlers::LoggingErrorHandler, +}; +use std::{fmt::Debug, future::Future, sync::Arc}; +use teloxide_core::{requests::Requester, types::Message}; + +/// A [REPL] for dialogues. +/// +/// All errors from an update listener and handler will be logged. This function +/// uses [`InMemStorage`]. +/// +/// # Caution +/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, +/// because Telegram disallow multiple requests at the same time from the same +/// bot. +/// +/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop +/// [`Dispatcher`]: crate::dispatching::Dispatcher +/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage +#[cfg(feature = "ctrlc_handler")] +pub async fn dialogues_repl<'a, R, H, D, Fut>(requester: R, handler: H) +where + H: Fn(UpdateWithCx, D) -> Fut + Send + Sync + 'static, + D: Clone + Default + Send + 'static, + Fut: Future> + Send + 'static, + R: Requester + Send + Clone + 'static, + ::GetUpdatesFaultTolerant: Send, +{ + let cloned_requester = requester.clone(); + + dialogues_repl_with_listener( + requester, + handler, + update_listeners::polling_default(cloned_requester).await, + ) + .await; +} + +/// Like [`dialogues_repl`], but with a custom [`UpdateListener`]. +/// +/// All errors from an update listener and handler will be logged. This function +/// uses [`InMemStorage`]. +/// +/// # Caution +/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, +/// because Telegram disallow multiple requests at the same time from the same +/// bot. +/// +/// [`Dispatcher`]: crate::dispatching::Dispatcher +/// [`dialogues_repl`]: crate::dispatching::repls::dialogues_repl() +/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener +/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage +#[cfg(feature = "ctrlc_handler")] +pub async fn dialogues_repl_with_listener<'a, R, H, D, Fut, L, ListenerE>( + requester: R, + handler: H, + listener: L, +) where + H: Fn(UpdateWithCx, D) -> Fut + Send + Sync + 'static, + D: Clone + Default + Send + 'static, + Fut: Future> + Send + 'static, + L: UpdateListener + Send + 'a, + ListenerE: Debug + Send + 'a, + R: Requester + Send + Clone + 'static, +{ + let handler = Arc::new(handler); + + Dispatcher::new(requester) + .messages_handler(DialogueDispatcher::new( + move |DialogueWithCx { cx, dialogue }: DialogueWithCx< + R, + Message, + D, + InMemStorageError, + >| { + let handler = Arc::clone(&handler); + + async move { + let dialogue = dialogue.expect("std::convert::Infallible"); + handler(cx, dialogue).await + } + }, + )) + .setup_ctrlc_handler() + .dispatch_with_listener( + listener, + LoggingErrorHandler::with_custom_text("An error from the update listener"), + ) + .await; +} diff --git a/src/dispatching2/repls/mod.rs b/src/dispatching2/repls/mod.rs new file mode 100644 index 00000000..858f6f9a --- /dev/null +++ b/src/dispatching2/repls/mod.rs @@ -0,0 +1,7 @@ +//mod commands_repl; +//mod dialogues_repl; +mod repl; + +//pub use commands_repl::{commands_repl, commands_repl_with_listener}; +//pub use dialogues_repl::{dialogues_repl, dialogues_repl_with_listener}; +pub use repl::{repl, repl_with_listener}; diff --git a/src/dispatching2/repls/repl.rs b/src/dispatching2/repls/repl.rs new file mode 100644 index 00000000..c9e83bb0 --- /dev/null +++ b/src/dispatching2/repls/repl.rs @@ -0,0 +1,77 @@ +use crate::{ + dispatching::{update_listeners, update_listeners::UpdateListener}, + dispatching2::Dispatcher, + error_handlers::{LoggingErrorHandler, OnError}, +}; +use dptree::di::{DependencyMap, Injector}; +use std::{fmt::Debug, sync::Arc}; +use teloxide_core::requests::Requester; + +/// A [REPL] for messages. +/// +/// All errors from an update listener and a handler will be logged. +/// +/// # Caution +/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, +/// because Telegram disallow multiple requests at the same time from the same +/// bot. +/// +/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop +/// [`Dispatcher`]: crate::dispatching::Dispatcher +#[cfg(feature = "ctrlc_handler")] +pub async fn repl(requester: R, handler: H) +where + H: Injector, Args> + Send + Sync + 'static, + Result<(), E>: OnError, + E: Debug + Send + Sync + 'static, + R: Requester + Send + Sync + Clone + 'static, + ::GetUpdatesFaultTolerant: Send, +{ + let cloned_requester = requester.clone(); + repl_with_listener( + requester, + handler, + update_listeners::polling_default(cloned_requester).await, + ) + .await; +} + +/// Like [`repl`], but with a custom [`UpdateListener`]. +/// +/// All errors from an update listener and handler will be logged. +/// +/// # Caution +/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, +/// because Telegram disallow multiple requests at the same time from the same +/// bot. +/// +/// [`Dispatcher`]: crate::dispatching::Dispatcher +/// [`repl`]: crate::dispatching::repls::repl() +/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener +#[cfg(feature = "ctrlc_handler")] +pub async fn repl_with_listener<'a, R, H, E, L, ListenerE, Args>( + requester: R, + handler: H, + listener: L, +) where + H: Injector, Args> + Send + Sync + 'static, + L: UpdateListener + Send + 'a, + ListenerE: Debug, + Result<(), E>: OnError, + E: Debug + Send + Sync + 'static, + R: Requester + Send + Sync + 'static, +{ + #[allow(unused_mut)] + let mut dispatcher = Dispatcher::new(Arc::new(requester)) + .messages_handler(|h| h.branch(dptree::endpoint(handler))); + + #[cfg(feature = "ctrlc_handler")] + let mut dispatcher = dispatcher.setup_ctrlc_handler(); + + dispatcher + .dispatch_with_listener( + listener, + LoggingErrorHandler::with_custom_text("An error from the update listener"), + ) + .await; +} diff --git a/src/lib.rs b/src/lib.rs index 047bdb67..55121bdd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -60,14 +60,19 @@ // https://github.com/rust-lang/rust-clippy/issues/7422 #![allow(clippy::nonstandard_macro_braces)] +#[cfg(not(feature = "dispatching2"))] pub use dispatching::repls::{ commands_repl, commands_repl_with_listener, dialogues_repl, dialogues_repl_with_listener, repl, repl_with_listener, }; +#[cfg(feature = "dispatching2")] +pub use dispatching2::repls::{repl, repl_with_listener}; + mod logging; pub mod dispatching; +#[cfg(feature = "dispatching2")] pub mod dispatching2; pub mod error_handlers; pub mod prelude; diff --git a/src/prelude.rs b/src/prelude.rs index b7f9a040..2f0d107d 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,17 +1,22 @@ //! Commonly used items. pub use crate::{ - dispatching::{ - dialogue::{ - exit, next, DialogueDispatcher, DialogueStage, DialogueWithCx, GetChatId, Transition, - TransitionIn, TransitionOut, - }, - Dispatcher, DispatcherHandlerRx, DispatcherHandlerRxExt, UpdateWithCx, - }, error_handlers::{LoggingErrorHandler, OnError}, respond, }; +#[cfg(not(feature = "dispatching2"))] +pub use crate::dispatching::{ + dialogue::{ + exit, next, DialogueDispatcher, DialogueStage, DialogueWithCx, GetChatId, Transition, + TransitionIn, TransitionOut, + }, + Dispatcher, DispatcherHandlerRx, DispatcherHandlerRxExt, UpdateWithCx, +}; + +#[cfg(feature = "dispatching2")] +pub use crate::dispatching2::Dispatcher; + #[cfg_attr(all(docsrs, feature = "nightly"), doc(cfg(feature = "macros")))] #[cfg(feature = "macros")] pub use crate::teloxide;