diff --git a/Cargo.toml b/Cargo.toml index ed919a41..977a540c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,7 +75,7 @@ teloxide-macros = { version = "0.4", optional = true } serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } -dptree = { git = "https://github.com/p0lunin/dptree", branch = "kiss3" } +dptree = { git = "https://github.com/p0lunin/dptree" } tokio = { version = "1.8", features = ["fs"] } tokio-util = "0.6" diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 77133764..edcc15d7 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -544,8 +544,8 @@ impl std::error::Error for IdleShutdownError {} /// A token which used to shutdown [`Dispatcher`]. #[derive(Clone)] pub struct ShutdownToken { - dispatcher_state: Arc, - shutdown_notify_back: Arc, + pub(crate) dispatcher_state: Arc, + pub(crate) shutdown_notify_back: Arc, } impl ShutdownToken { @@ -566,20 +566,20 @@ impl ShutdownToken { } } -struct DispatcherState { +pub(crate) struct DispatcherState { inner: AtomicU8, } impl DispatcherState { - fn load(&self) -> ShutdownState { + pub(crate) fn load(&self) -> ShutdownState { ShutdownState::from_u8(self.inner.load(Ordering::SeqCst)) } - fn store(&self, new: ShutdownState) { + pub(crate) fn store(&self, new: ShutdownState) { self.inner.store(new as _, Ordering::SeqCst) } - fn compare_exchange( + pub(crate) fn compare_exchange( &self, current: ShutdownState, new: ShutdownState, @@ -599,7 +599,7 @@ impl Default for DispatcherState { #[repr(u8)] #[derive(Debug)] -enum ShutdownState { +pub(crate) enum ShutdownState { Running, ShuttingDown, Idle, @@ -620,21 +620,17 @@ impl ShutdownState { } } -fn shutdown_check_timeout_for(update_listener: &impl UpdateListener) -> Duration { +pub(crate) fn shutdown_check_timeout_for(update_listener: &impl UpdateListener) -> Duration { const MIN_SHUTDOWN_CHECK_TIMEOUT: Duration = Duration::from_secs(1); - - // FIXME: replace this by just Duration::ZERO once 1.53 will be released - const DZERO: Duration = Duration::from_secs(0); + const DZERO: Duration = Duration::ZERO; let shutdown_check_timeout = update_listener.timeout_hint().unwrap_or(DZERO); - - // FIXME: replace this by just saturating_add once 1.53 will be released - shutdown_check_timeout.checked_add(MIN_SHUTDOWN_CHECK_TIMEOUT).unwrap_or(shutdown_check_timeout) + shutdown_check_timeout.saturating_add(MIN_SHUTDOWN_CHECK_TIMEOUT) } -struct AlreadyShuttingDown; +pub(crate) struct AlreadyShuttingDown; -fn shutdown_inner( +pub(crate) fn shutdown_inner( state: &DispatcherState, ) -> Result<(), Result> { use ShutdownState::*; diff --git a/src/dispatching/mod.rs b/src/dispatching/mod.rs index 9936de96..d96ef0b0 100644 --- a/src/dispatching/mod.rs +++ b/src/dispatching/mod.rs @@ -62,6 +62,10 @@ pub use dispatcher_handler_rx_ext::DispatcherHandlerRxExt; use tokio::sync::mpsc::UnboundedReceiver; pub use update_with_cx::{UpdateWithCx, UpdateWithCxRequesterType}; +pub(crate) use dispatcher::{ + shutdown_check_timeout_for, shutdown_inner, DispatcherState, ShutdownState, +}; + /// A type of a stream, consumed by [`Dispatcher`]'s handlers. /// /// [`Dispatcher`]: crate::dispatching::Dispatcher diff --git a/src/dispatching2/dispatcher.rs b/src/dispatching2/dispatcher.rs index 30167507..b43e29fc 100644 --- a/src/dispatching2/dispatcher.rs +++ b/src/dispatching2/dispatcher.rs @@ -1,44 +1,264 @@ use crate::{ - dispatching2::handlers::{Handlers, Replaced, UpdateHandler}, - types, - types::Update, + dispatching::{ + shutdown_check_timeout_for, shutdown_inner, stop_token::StopToken, update_listeners, + update_listeners::UpdateListener, DispatcherState, ShutdownToken, + }, + error_handlers::{ErrorHandler, LoggingErrorHandler}, + requests::Requester, + types::{AllowedUpdate, Update, UpdateKind}, }; -use dptree::{di::DependencySupplier, Replace}; +use dptree::di::DependencyMap; +use futures::StreamExt; +use std::{collections::HashSet, convert::Infallible, fmt::Debug, ops::ControlFlow, sync::Arc}; +use tokio::{sync::Notify, time::timeout}; -pub struct Dispatcher -where - C: Replace, -{ - handlers: Handlers, +pub struct Dispatcher { + requester: R, + dependencies: DependencyMap, + + handler: UpdateHandler, + default_handler: DefaultHandler, + allowed_updates: HashSet, + + state: Arc, + shutdown_notify_back: Arc, } -impl Dispatcher +// TODO: it is allowed to return message as response on telegram request in +// webhooks, so we can allow this too. See more there: https://core.telegram.org/bots/api#making-requests-when-getting-updates +// FIXME: remove 'static lifetime? +pub type UpdateHandler = dptree::Handler<'static, DependencyMap, Result<(), Err>>; +pub type DefaultHandler = dptree::Handler<'static, DependencyMap, (), Infallible>; + +macro_rules! make_parser { + ($kind:ident) => { + dptree::parser(|upd: &Update| match &upd.kind { + UpdateKind::$kind(u) => Some(u.clone()), + _ => None, + }) + }; +} + +impl Dispatcher where - C: DependencySupplier - + Send - + Sync - + 'static - + Replace, - IR: Send + Sync + 'static + Replace, + R: 'static, Err: Send + Sync + 'static, { - pub fn new() -> Self { - Dispatcher { handlers: Handlers::new() } + pub fn new(requester: R) -> Self { + Dispatcher { + requester, + dependencies: DependencyMap::new(), + handler: dptree::entry(), + default_handler: dptree::endpoint(|update: Arc| async move { + log::warn!("Unhandled update: {:?}", update.as_ref()) + }), + allowed_updates: Default::default(), + state: Arc::new(Default::default()), + shutdown_notify_back: Arc::new(Default::default()), + } + } + + /// Setup the `^C` handler which [`shutdown`]s dispatching. + /// + /// [`shutdown`]: ShutdownToken::shutdown + #[cfg(feature = "ctrlc_handler")] + #[cfg_attr(docsrs, doc(cfg(feature = "ctrlc_handler")))] + pub fn setup_ctrlc_handler(self) -> Self { + let state = Arc::clone(&self.state); + tokio::spawn(async move { + loop { + tokio::signal::ctrl_c().await.expect("Failed to listen for ^C"); + + match shutdown_inner(&state) { + Ok(()) => log::info!("^C received, trying to shutdown the dispatcher..."), + Err(Ok(_)) => { + log::info!( + "^C received, the dispatcher is already shutting down, ignoring the \ + signal" + ) + } + Err(Err(_)) => { + log::info!("^C received, the dispatcher isn't running, ignoring the signal") + } + } + } + }); + + self + } + + /// Returns a shutdown token, which can later be used to shutdown + /// dispatching. + pub fn shutdown_token(&self) -> ShutdownToken { + ShutdownToken { + dispatcher_state: Arc::clone(&self.state), + shutdown_notify_back: Arc::clone(&self.shutdown_notify_back), + } + } + + /// Starts your bot with the default parameters. + /// + /// The default parameters are a long polling update listener and log all + /// errors produced by this listener). + /// + /// Please note that after shutting down (either because of [`shutdown`], + /// [a ctrlc signal], or [`UpdateListener`] returning `None`) all handlers + /// will be gone. As such, to restart listening you need to re-add + /// handlers. + /// + /// [`shutdown`]: ShutdownToken::shutdown + /// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler + pub async fn dispatch(&mut self) + where + R: Requester + Clone, + ::GetUpdatesFaultTolerant: Send, + { + let listener = update_listeners::polling_default(self.requester.clone()).await; + let error_handler = + LoggingErrorHandler::with_custom_text("An error from the update listener"); + + self.dispatch_with_listener(listener, error_handler).await; + } + + /// Starts your bot with custom `update_listener` and + /// `update_listener_error_handler`. + /// + /// Please note that after shutting down (either because of [`shutdown`], + /// [a ctrlc signal], or [`UpdateListener`] returning `None`) all handlers + /// will be gone. As such, to restart listening you need to re-add + /// handlers. + /// + /// [`shutdown`]: ShutdownToken::shutdown + /// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler + pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>( + &'a mut self, + mut update_listener: UListener, + update_listener_error_handler: Arc, + ) where + UListener: UpdateListener + 'a, + Eh: ErrorHandler + 'a, + ListenerE: Debug, + R: Requester + Clone, + { + use crate::dispatching::ShutdownState::*; + + update_listener.hint_allowed_updates(&mut self.allowed_updates.clone().into_iter()); + + let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener); + let mut stop_token = Some(update_listener.stop_token()); + + if let Err(actual) = self.state.compare_exchange(Idle, Running) { + unreachable!( + "Dispatching is already running: expected `{:?}` state, found `{:?}`", + Idle, actual + ); + } + + { + let stream = update_listener.as_stream(); + tokio::pin!(stream); + + loop { + // False positive + #[allow(clippy::collapsible_match)] + if let Ok(upd) = timeout(shutdown_check_timeout, stream.next()).await { + match upd { + None => break, + Some(upd) => self.process_update(upd, &update_listener_error_handler).await, + } + } + + if let ShuttingDown = self.state.load() { + if let Some(token) = stop_token.take() { + log::debug!("Start shutting down dispatching..."); + token.stop(); + break; + } + } + } + } + + if let ShuttingDown = self.state.load() { + // Stopped because of a `shutdown` call. + + // Notify `shutdown`s that we finished + self.shutdown_notify_back.notify_waiters(); + log::info!("Dispatching has been shut down."); + } else { + log::info!("Dispatching has been stopped (listener returned `None`)."); + } + + self.state.store(Idle); + } + + async fn process_update( + &self, + update: Result, + err_handler: &Arc, + ) where + LErrHandler: ErrorHandler, + { + match update { + Ok(upd) => { + let mut deps = self.dependencies.clone(); + deps.insert(upd); + match self.handler.dispatch(deps).await { + ControlFlow::Break(Ok(())) => {} + ControlFlow::Break(Err(_err)) => todo!("error handler"), + ControlFlow::Continue(deps) => { + match self + .default_handler + .clone() + .execute(deps, |next| async move { match next {} }) + .await + { + ControlFlow::Break(()) => {} + ControlFlow::Continue(_) => unreachable!( + "This is unreachable due to Infallible type in the DefaultHandler \ + type" + ), + } + } + } + } + Err(err) => err_handler.clone().handle_error(err).await, + } + } + + pub fn handler(self, handler: UpdateHandler) -> Self { + Dispatcher { handler: self.handler.branch(handler), ..self } + } + + // Specify handler that will be called if other handlers was not handle the + // update. + pub fn default_handler(self, handler: UpdateHandler) -> Self { + Dispatcher { handler: self.handler.branch(handler), ..self } + } + + // Specify dependencies that can be used inside of handlers. + pub fn dependencies(self, dependencies: DependencyMap) -> Self { + Dispatcher { dependencies, ..self } } pub fn message_handler( mut self, - handler: UpdateHandler, Err>, + make_handler: impl FnOnce(UpdateHandler) -> UpdateHandler, ) -> Self { - self.handlers.message_handler(handler); - self + self.allowed_updates.insert(AllowedUpdate::Message); + + let parser = make_parser!(Message); + let handler = make_handler(parser); + self.handler(handler) } pub fn edited_message_handler( mut self, - handler: UpdateHandler, Err>, + make_handler: impl FnOnce(UpdateHandler) -> UpdateHandler, ) -> Self { - self.handlers.edited_message_handler(handler); - self + self.allowed_updates.insert(AllowedUpdate::EditedMessage); + + let parser = make_parser!(EditedMessage); + let handler = make_handler(parser); + self.handler(handler) } } diff --git a/src/dispatching2/handlers.rs b/src/dispatching2/handlers.rs deleted file mode 100644 index d63762ed..00000000 --- a/src/dispatching2/handlers.rs +++ /dev/null @@ -1,57 +0,0 @@ -use crate::{ - types, - types::{Update, UpdateKind}, -}; -use dptree::{di::DependencySupplier, Handler, Replace}; - -pub type Replaced = >::Out; - -pub struct Handlers -where - C: Replace, -{ - message_handler: UpdateHandler>, - edited_message_handler: UpdateHandler>, -} - -macro_rules! new_handler { - ($kind:ident) => { - dptree::parser(|upd: &Update| match &upd.kind { - UpdateKind::$kind(u) => Some(u.clone()), - _ => None, - }) - }; -} - -impl Handlers -where - C: DependencySupplier - + Send - + Sync - + 'static - + Replace, - IR: Send + Sync + 'static + Replace, - Err: Send + Sync + 'static, -{ - pub fn new() -> Self { - Handlers { - message_handler: new_handler!(Message), - edited_message_handler: new_handler!(EditedMessage), - } - } - - pub fn message_handler(&mut self, handler: UpdateHandler, Err>) { - self.message_handler = self.message_handler.clone().branch(handler); - } - - pub fn edited_message_handler( - &mut self, - handler: UpdateHandler, Err>, - ) { - self.edited_message_handler = self.edited_message_handler.clone().branch(handler); - } -} - -// TODO: it is allowed to return message as answer on telegram request in -// webhooks, so we can allow this too. See more there: https://core.telegram.org/bots/api#making-requests-when-getting-updates -pub type UpdateHandler = Handler<'static, C, Result<(), Err>, IR>; diff --git a/src/dispatching2/mod.rs b/src/dispatching2/mod.rs index 06c2e110..e89c01c1 100644 --- a/src/dispatching2/mod.rs +++ b/src/dispatching2/mod.rs @@ -1,4 +1,3 @@ mod dispatcher; -mod handlers; pub use dispatcher::Dispatcher;