added dispatching2 feature, implemented standard repls

This commit is contained in:
p0lunin 2021-11-25 13:44:55 +02:00
parent 14083c8300
commit 93ed028e44
11 changed files with 309 additions and 15 deletions

View file

@ -27,6 +27,8 @@ maintenance = { status = "actively-developed" }
[features]
default = ["native-tls", "ctrlc_handler", "teloxide-core/default"]
dispatching2 = ["dptree"]
sqlite-storage = ["sqlx"]
redis-storage = ["redis"]
cbor-serializer = ["serde_cbor"]
@ -75,7 +77,7 @@ teloxide-macros = { version = "0.4", optional = true }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
dptree = { git = "https://github.com/p0lunin/dptree" }
dptree = { git = "https://github.com/p0lunin/dptree", optional = true }
tokio = { version = "1.8", features = ["fs"] }
tokio-util = "0.6"

View file

@ -1,4 +1,4 @@
use crate::prelude::{DialogueStage, DialogueWithCx};
use crate::dispatching::dialogue::{DialogueStage, DialogueWithCx};
use futures::future::BoxFuture;
use std::{future::Future, sync::Arc};

View file

@ -49,6 +49,7 @@ pub mod dialogue;
pub mod stop_token;
pub mod update_listeners;
#[cfg(not(feature = "dispatching2"))]
pub(crate) mod repls;
mod dispatcher;
@ -62,6 +63,7 @@ pub use dispatcher_handler_rx_ext::DispatcherHandlerRxExt;
use tokio::sync::mpsc::UnboundedReceiver;
pub use update_with_cx::{UpdateWithCx, UpdateWithCxRequesterType};
#[cfg(feature = "dispatching2")]
pub(crate) use dispatcher::{
shutdown_check_timeout_for, shutdown_inner, DispatcherState, ShutdownState,
};

View file

@ -13,7 +13,7 @@ use std::{collections::HashSet, convert::Infallible, fmt::Debug, ops::ControlFlo
use tokio::{sync::Notify, time::timeout};
pub struct Dispatcher<R, Err> {
requester: R,
requester: Arc<R>,
dependencies: DependencyMap,
handler: UpdateHandler<Err>,
@ -41,10 +41,10 @@ macro_rules! make_parser {
impl<R, Err> Dispatcher<R, Err>
where
R: 'static,
R: Send + Sync + 'static,
Err: Send + Sync + 'static,
{
pub fn new(requester: R) -> Self {
pub fn new(requester: Arc<R>) -> Self {
Dispatcher {
requester,
dependencies: DependencyMap::new(),
@ -138,7 +138,6 @@ where
UListener: UpdateListener<ListenerE> + 'a,
Eh: ErrorHandler<ListenerE> + 'a,
ListenerE: Debug,
R: Requester + Clone,
{
use crate::dispatching::ShutdownState::*;
@ -202,6 +201,7 @@ where
Ok(upd) => {
let mut deps = self.dependencies.clone();
deps.insert(upd);
deps.insert_arc(self.requester.clone());
match self.handler.dispatch(deps).await {
ControlFlow::Break(Ok(())) => {}
ControlFlow::Break(Err(_err)) => todo!("error handler"),
@ -240,7 +240,7 @@ where
Dispatcher { dependencies, ..self }
}
pub fn message_handler(
pub fn messages_handler(
mut self,
make_handler: impl FnOnce(UpdateHandler<Err>) -> UpdateHandler<Err>,
) -> Self {
@ -251,7 +251,7 @@ where
self.handler(handler)
}
pub fn edited_message_handler(
pub fn edited_messages_handler(
mut self,
make_handler: impl FnOnce(UpdateHandler<Err>) -> UpdateHandler<Err>,
) -> Self {

View file

@ -1,3 +1,5 @@
pub(crate) mod repls;
mod dispatcher;
pub use dispatcher::Dispatcher;

View file

@ -0,0 +1,98 @@
use crate::{
dispatching::{
update_listeners, update_listeners::UpdateListener, Dispatcher, DispatcherHandlerRx,
DispatcherHandlerRxExt, UpdateWithCx,
},
error_handlers::{LoggingErrorHandler, OnError},
utils::command::BotCommand,
};
use futures::StreamExt;
use std::{fmt::Debug, future::Future, sync::Arc};
use teloxide_core::{requests::Requester, types::Message};
use tokio_stream::wrappers::UnboundedReceiverStream;
/// A [REPL] for commands.
///
/// All errors from an update listener and handler will be logged.
///
/// # Caution
/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs,
/// because Telegram disallow multiple requests at the same time from the same
/// bot.
///
/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop
/// [`Dispatcher`]: crate::dispatching::Dispatcher
#[cfg(feature = "ctrlc_handler")]
pub async fn commands_repl<R, Cmd, H, Fut, HandlerE, N>(requester: R, bot_name: N, handler: H)
where
Cmd: BotCommand + Send + 'static,
H: Fn(UpdateWithCx<R, Message>, Cmd) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), HandlerE>> + Send + 'static,
Result<(), HandlerE>: OnError<HandlerE>,
HandlerE: Debug + Send,
N: Into<String> + Send + 'static,
R: Requester + Send + Clone + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
let cloned_requester = requester.clone();
commands_repl_with_listener(
requester,
bot_name,
handler,
update_listeners::polling_default(cloned_requester).await,
)
.await;
}
/// Like [`commands_repl`], but with a custom [`UpdateListener`].
///
/// All errors from an update listener and handler will be logged.
///
/// # Caution
/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs,
/// because Telegram disallow multiple requests at the same time from the same
/// bot.
///
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`commands_repl`]: crate::dispatching::repls::commands_repl()
/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener
#[cfg(feature = "ctrlc_handler")]
pub async fn commands_repl_with_listener<'a, R, Cmd, H, Fut, L, ListenerE, HandlerE, N>(
requester: R,
bot_name: N,
handler: H,
listener: L,
) where
Cmd: BotCommand + Send + 'static,
H: Fn(UpdateWithCx<R, Message>, Cmd) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), HandlerE>> + Send + 'static,
L: UpdateListener<ListenerE> + Send + 'a,
ListenerE: Debug + Send + 'a,
Result<(), HandlerE>: OnError<HandlerE>,
HandlerE: Debug + Send,
N: Into<String> + Send + 'static,
R: Requester + Clone + Send + 'static,
{
let handler = Arc::new(handler);
Dispatcher::<R>::new(requester)
.messages_handler(move |rx: DispatcherHandlerRx<R, Message>| {
UnboundedReceiverStream::new(rx).commands::<Cmd, N>(bot_name).for_each_concurrent(
None,
move |(cx, cmd)| {
let handler = Arc::clone(&handler);
async move {
handler(cx, cmd).await.log_on_error().await;
}
},
)
})
.setup_ctrlc_handler()
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),
)
.await
}

View file

@ -0,0 +1,96 @@
use crate::{
dispatching::{
dialogue::{DialogueDispatcher, DialogueStage, DialogueWithCx, InMemStorageError},
update_listeners,
update_listeners::UpdateListener,
Dispatcher, UpdateWithCx,
},
error_handlers::LoggingErrorHandler,
};
use std::{fmt::Debug, future::Future, sync::Arc};
use teloxide_core::{requests::Requester, types::Message};
/// A [REPL] for dialogues.
///
/// All errors from an update listener and handler will be logged. This function
/// uses [`InMemStorage`].
///
/// # Caution
/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs,
/// because Telegram disallow multiple requests at the same time from the same
/// bot.
///
/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage
#[cfg(feature = "ctrlc_handler")]
pub async fn dialogues_repl<'a, R, H, D, Fut>(requester: R, handler: H)
where
H: Fn(UpdateWithCx<R, Message>, D) -> Fut + Send + Sync + 'static,
D: Clone + Default + Send + 'static,
Fut: Future<Output = DialogueStage<D>> + Send + 'static,
R: Requester + Send + Clone + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
let cloned_requester = requester.clone();
dialogues_repl_with_listener(
requester,
handler,
update_listeners::polling_default(cloned_requester).await,
)
.await;
}
/// Like [`dialogues_repl`], but with a custom [`UpdateListener`].
///
/// All errors from an update listener and handler will be logged. This function
/// uses [`InMemStorage`].
///
/// # Caution
/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs,
/// because Telegram disallow multiple requests at the same time from the same
/// bot.
///
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`dialogues_repl`]: crate::dispatching::repls::dialogues_repl()
/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener
/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage
#[cfg(feature = "ctrlc_handler")]
pub async fn dialogues_repl_with_listener<'a, R, H, D, Fut, L, ListenerE>(
requester: R,
handler: H,
listener: L,
) where
H: Fn(UpdateWithCx<R, Message>, D) -> Fut + Send + Sync + 'static,
D: Clone + Default + Send + 'static,
Fut: Future<Output = DialogueStage<D>> + Send + 'static,
L: UpdateListener<ListenerE> + Send + 'a,
ListenerE: Debug + Send + 'a,
R: Requester + Send + Clone + 'static,
{
let handler = Arc::new(handler);
Dispatcher::new(requester)
.messages_handler(DialogueDispatcher::new(
move |DialogueWithCx { cx, dialogue }: DialogueWithCx<
R,
Message,
D,
InMemStorageError,
>| {
let handler = Arc::clone(&handler);
async move {
let dialogue = dialogue.expect("std::convert::Infallible");
handler(cx, dialogue).await
}
},
))
.setup_ctrlc_handler()
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),
)
.await;
}

View file

@ -0,0 +1,7 @@
//mod commands_repl;
//mod dialogues_repl;
mod repl;
//pub use commands_repl::{commands_repl, commands_repl_with_listener};
//pub use dialogues_repl::{dialogues_repl, dialogues_repl_with_listener};
pub use repl::{repl, repl_with_listener};

View file

@ -0,0 +1,77 @@
use crate::{
dispatching::{update_listeners, update_listeners::UpdateListener},
dispatching2::Dispatcher,
error_handlers::{LoggingErrorHandler, OnError},
};
use dptree::di::{DependencyMap, Injector};
use std::{fmt::Debug, sync::Arc};
use teloxide_core::requests::Requester;
/// A [REPL] for messages.
///
/// All errors from an update listener and a handler will be logged.
///
/// # Caution
/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs,
/// because Telegram disallow multiple requests at the same time from the same
/// bot.
///
/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop
/// [`Dispatcher`]: crate::dispatching::Dispatcher
#[cfg(feature = "ctrlc_handler")]
pub async fn repl<R, H, E, Args>(requester: R, handler: H)
where
H: Injector<DependencyMap, Result<(), E>, Args> + Send + Sync + 'static,
Result<(), E>: OnError<E>,
E: Debug + Send + Sync + 'static,
R: Requester + Send + Sync + Clone + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
let cloned_requester = requester.clone();
repl_with_listener(
requester,
handler,
update_listeners::polling_default(cloned_requester).await,
)
.await;
}
/// Like [`repl`], but with a custom [`UpdateListener`].
///
/// All errors from an update listener and handler will be logged.
///
/// # Caution
/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs,
/// because Telegram disallow multiple requests at the same time from the same
/// bot.
///
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`repl`]: crate::dispatching::repls::repl()
/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener
#[cfg(feature = "ctrlc_handler")]
pub async fn repl_with_listener<'a, R, H, E, L, ListenerE, Args>(
requester: R,
handler: H,
listener: L,
) where
H: Injector<DependencyMap, Result<(), E>, Args> + Send + Sync + 'static,
L: UpdateListener<ListenerE> + Send + 'a,
ListenerE: Debug,
Result<(), E>: OnError<E>,
E: Debug + Send + Sync + 'static,
R: Requester + Send + Sync + 'static,
{
#[allow(unused_mut)]
let mut dispatcher = Dispatcher::new(Arc::new(requester))
.messages_handler(|h| h.branch(dptree::endpoint(handler)));
#[cfg(feature = "ctrlc_handler")]
let mut dispatcher = dispatcher.setup_ctrlc_handler();
dispatcher
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),
)
.await;
}

View file

@ -60,14 +60,19 @@
// https://github.com/rust-lang/rust-clippy/issues/7422
#![allow(clippy::nonstandard_macro_braces)]
#[cfg(not(feature = "dispatching2"))]
pub use dispatching::repls::{
commands_repl, commands_repl_with_listener, dialogues_repl, dialogues_repl_with_listener, repl,
repl_with_listener,
};
#[cfg(feature = "dispatching2")]
pub use dispatching2::repls::{repl, repl_with_listener};
mod logging;
pub mod dispatching;
#[cfg(feature = "dispatching2")]
pub mod dispatching2;
pub mod error_handlers;
pub mod prelude;

View file

@ -1,17 +1,22 @@
//! Commonly used items.
pub use crate::{
dispatching::{
dialogue::{
exit, next, DialogueDispatcher, DialogueStage, DialogueWithCx, GetChatId, Transition,
TransitionIn, TransitionOut,
},
Dispatcher, DispatcherHandlerRx, DispatcherHandlerRxExt, UpdateWithCx,
},
error_handlers::{LoggingErrorHandler, OnError},
respond,
};
#[cfg(not(feature = "dispatching2"))]
pub use crate::dispatching::{
dialogue::{
exit, next, DialogueDispatcher, DialogueStage, DialogueWithCx, GetChatId, Transition,
TransitionIn, TransitionOut,
},
Dispatcher, DispatcherHandlerRx, DispatcherHandlerRxExt, UpdateWithCx,
};
#[cfg(feature = "dispatching2")]
pub use crate::dispatching2::Dispatcher;
#[cfg_attr(all(docsrs, feature = "nightly"), doc(cfg(feature = "macros")))]
#[cfg(feature = "macros")]
pub use crate::teloxide;