diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d4310c6..4a7b64c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `InMemStorageError` with a single variant `DialogueNotFound` to be returned from `InMemStorage::remove_dialogue`. - `RedisStorageError::DialogueNotFound` and `SqliteStorageError::DialogueNotFound` to be returned from `Storage::remove_dialogue`. - `Dispatcher::shutdown` function. + - `Dispatcher::setup_ctrlc_handler` function ([issue 153](https://github.com/teloxide/teloxide/issues/153)). ### Changed @@ -21,7 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Automatically delete a webhook if it was set up in `update_listeners::polling_default` (thereby making it `async`, [issue 319](https://github.com/teloxide/teloxide/issues/319)). - `polling` and `polling_default` now require `R: 'static` - Refactor `UpdateListener` trait: - - Add a `stop` function that allows stopping the listener. + - Add a `stop` function that allows stopping the listener ([issue 166](https://github.com/teloxide/teloxide/issues/166)). - Remove blanked implementation. - Remove `Stream` from super traits. - Add `AsUpdateStream` to super traits. diff --git a/Cargo.toml b/Cargo.toml index 509ca839..ec2eba2d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,8 @@ bincode-serializer = ["bincode"] frunk- = ["frunk"] macros = ["teloxide-macros"] +ctrlc_handler = ["tokio/signal"] + native-tls = ["teloxide-core/native-tls"] rustls = ["teloxide-core/rustls"] auto-send = ["teloxide-core/auto_send"] @@ -51,6 +53,7 @@ full = [ "bincode-serializer", "frunk", "macros", + "ctrlc_handler", "teloxide-core/full", "native-tls", "rustls", diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index ea5d5844..bf088394 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -75,7 +75,7 @@ pub struct Dispatcher { my_chat_members_queue: Tx, chat_members_queue: Tx, - shutdown_state: AtomicShutdownState, + shutdown_state: Arc, shutdown_notify_back: Notify, } @@ -101,9 +101,9 @@ where poll_answers_queue: None, my_chat_members_queue: None, chat_members_queue: None, - shutdown_state: AtomicShutdownState { + shutdown_state: Arc::new(AtomicShutdownState { inner: AtomicU8::new(ShutdownState::IsntRunning as _), - }, + }), shutdown_notify_back: Notify::new(), } } @@ -124,6 +124,25 @@ where Some(tx) } + /// Setup `^C` handler which [`shutdown`]s dispatching. + /// + /// [`shutdown`]: Dispatcher::shutdown + #[cfg(feature = "ctrlc_handler")] + #[cfg_attr(docsrs, doc(cfg(feature = "ctrlc_handler")))] + pub fn setup_ctrlc_handler(self) -> Self { + let shutdown_state = Arc::clone(&self.shutdown_state); + tokio::spawn(async move { + loop { + tokio::signal::ctrl_c().await.expect("Failed to listen for ^C"); + + // If dispatcher wasn't running, then there is nothing to do + Self::shutdown_inner(&shutdown_state).ok(); + } + }); + + self + } + #[must_use] pub fn messages_handler(mut self, h: H) -> Self where @@ -293,6 +312,7 @@ where } if let ShuttingDown = self.shutdown_state.load() { + log::debug!("Start shutting down dispatching"); break; } } @@ -310,25 +330,31 @@ where // Notify `shutdown`s that we finished self.shutdown_notify_back.notify_waiters(); + log::debug!("Dispatching shut down"); + } else { + log::debug!("Dispatching stopped (listner returned `None`)"); } self.shutdown_state.store(IsntRunning); } - /// Tries shutting down dispatching. + /// Tries to shutdown dispatching. /// - /// Returns error if this dispather isn't dispathing at the moment. + /// Returns error if this dispather isn't dispatching at the moment. /// - /// If you don't need to wait for shutdown returned future can be ignored. + /// If you don't need to wait for shutdown, returned future can be ignored. pub fn shutdown(&self) -> Result + '_, ShutdownError> { + Self::shutdown_inner(&self.shutdown_state) + .map(|()| async move { self.shutdown_notify_back.notified().await }) + } + + fn shutdown_inner(shutdown_state: &AtomicShutdownState) -> Result<(), ShutdownError> { use ShutdownState::*; - let res = self.shutdown_state.compare_exchange(Running, ShuttingDown); + let res = shutdown_state.compare_exchange(Running, ShuttingDown); match res { - Ok(_) | Err(ShuttingDown) => { - Ok(async move { self.shutdown_notify_back.notified().await }) - } + Ok(_) | Err(ShuttingDown) => Ok(()), Err(IsntRunning) => return Err(ShutdownError::IsntRunning), Err(Running) => unreachable!(), } diff --git a/src/features.txt b/src/features.txt index c0fa8e20..ee738058 100644 --- a/src/features.txt +++ b/src/features.txt @@ -9,6 +9,7 @@ | `macros` | Re-exports macros from [`teloxide-macros`]. | | `native-tls` | Enables the [`native-tls`] TLS implementation (enabled by default). | | `rustls` | Enables the [`rustls`] TLS implementation. | +| `ctrlc_handler` | Enables [`Dispatcher::setup_ctrlc_handler`](dispatching::Dispatcher::setup_ctrlc_handler) function. | | `auto-send` | Enables the `AutoSend` bot adaptor. | | `cache-me` | Enables the `CacheMe` bot adaptor. | | `frunk` | Enables [`teloxide::utils::UpState`]. |