Add Dispatcher::setup_ctrlc_handler function.

This function sets up `^C` handler which shuts down dispatching.
This commit is contained in:
Waffle 2021-05-18 18:30:57 +03:00
parent 41a95079b2
commit f0de55ad55
4 changed files with 42 additions and 11 deletions

View file

@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `InMemStorageError` with a single variant `DialogueNotFound` to be returned from `InMemStorage::remove_dialogue`. - `InMemStorageError` with a single variant `DialogueNotFound` to be returned from `InMemStorage::remove_dialogue`.
- `RedisStorageError::DialogueNotFound` and `SqliteStorageError::DialogueNotFound` to be returned from `Storage::remove_dialogue`. - `RedisStorageError::DialogueNotFound` and `SqliteStorageError::DialogueNotFound` to be returned from `Storage::remove_dialogue`.
- `Dispatcher::shutdown` function. - `Dispatcher::shutdown` function.
- `Dispatcher::setup_ctrlc_handler` function ([issue 153](https://github.com/teloxide/teloxide/issues/153)).
### Changed ### Changed
@ -21,7 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Automatically delete a webhook if it was set up in `update_listeners::polling_default` (thereby making it `async`, [issue 319](https://github.com/teloxide/teloxide/issues/319)). - Automatically delete a webhook if it was set up in `update_listeners::polling_default` (thereby making it `async`, [issue 319](https://github.com/teloxide/teloxide/issues/319)).
- `polling` and `polling_default` now require `R: 'static` - `polling` and `polling_default` now require `R: 'static`
- Refactor `UpdateListener` trait: - Refactor `UpdateListener` trait:
- Add a `stop` function that allows stopping the listener. - Add a `stop` function that allows stopping the listener ([issue 166](https://github.com/teloxide/teloxide/issues/166)).
- Remove blanked implementation. - Remove blanked implementation.
- Remove `Stream` from super traits. - Remove `Stream` from super traits.
- Add `AsUpdateStream` to super traits. - Add `AsUpdateStream` to super traits.

View file

@ -34,6 +34,8 @@ bincode-serializer = ["bincode"]
frunk- = ["frunk"] frunk- = ["frunk"]
macros = ["teloxide-macros"] macros = ["teloxide-macros"]
ctrlc_handler = ["tokio/signal"]
native-tls = ["teloxide-core/native-tls"] native-tls = ["teloxide-core/native-tls"]
rustls = ["teloxide-core/rustls"] rustls = ["teloxide-core/rustls"]
auto-send = ["teloxide-core/auto_send"] auto-send = ["teloxide-core/auto_send"]
@ -51,6 +53,7 @@ full = [
"bincode-serializer", "bincode-serializer",
"frunk", "frunk",
"macros", "macros",
"ctrlc_handler",
"teloxide-core/full", "teloxide-core/full",
"native-tls", "native-tls",
"rustls", "rustls",

View file

@ -75,7 +75,7 @@ pub struct Dispatcher<R> {
my_chat_members_queue: Tx<R, ChatMemberUpdated>, my_chat_members_queue: Tx<R, ChatMemberUpdated>,
chat_members_queue: Tx<R, ChatMemberUpdated>, chat_members_queue: Tx<R, ChatMemberUpdated>,
shutdown_state: AtomicShutdownState, shutdown_state: Arc<AtomicShutdownState>,
shutdown_notify_back: Notify, shutdown_notify_back: Notify,
} }
@ -101,9 +101,9 @@ where
poll_answers_queue: None, poll_answers_queue: None,
my_chat_members_queue: None, my_chat_members_queue: None,
chat_members_queue: None, chat_members_queue: None,
shutdown_state: AtomicShutdownState { shutdown_state: Arc::new(AtomicShutdownState {
inner: AtomicU8::new(ShutdownState::IsntRunning as _), inner: AtomicU8::new(ShutdownState::IsntRunning as _),
}, }),
shutdown_notify_back: Notify::new(), shutdown_notify_back: Notify::new(),
} }
} }
@ -124,6 +124,25 @@ where
Some(tx) Some(tx)
} }
/// Setup `^C` handler which [`shutdown`]s dispatching.
///
/// [`shutdown`]: Dispatcher::shutdown
#[cfg(feature = "ctrlc_handler")]
#[cfg_attr(docsrs, doc(cfg(feature = "ctrlc_handler")))]
pub fn setup_ctrlc_handler(self) -> Self {
let shutdown_state = Arc::clone(&self.shutdown_state);
tokio::spawn(async move {
loop {
tokio::signal::ctrl_c().await.expect("Failed to listen for ^C");
// If dispatcher wasn't running, then there is nothing to do
Self::shutdown_inner(&shutdown_state).ok();
}
});
self
}
#[must_use] #[must_use]
pub fn messages_handler<H>(mut self, h: H) -> Self pub fn messages_handler<H>(mut self, h: H) -> Self
where where
@ -293,6 +312,7 @@ where
} }
if let ShuttingDown = self.shutdown_state.load() { if let ShuttingDown = self.shutdown_state.load() {
log::debug!("Start shutting down dispatching");
break; break;
} }
} }
@ -310,25 +330,31 @@ where
// Notify `shutdown`s that we finished // Notify `shutdown`s that we finished
self.shutdown_notify_back.notify_waiters(); self.shutdown_notify_back.notify_waiters();
log::debug!("Dispatching shut down");
} else {
log::debug!("Dispatching stopped (listner returned `None`)");
} }
self.shutdown_state.store(IsntRunning); self.shutdown_state.store(IsntRunning);
} }
/// Tries shutting down dispatching. /// Tries to shutdown dispatching.
/// ///
/// Returns error if this dispather isn't dispathing at the moment. /// Returns error if this dispather isn't dispatching at the moment.
/// ///
/// If you don't need to wait for shutdown returned future can be ignored. /// If you don't need to wait for shutdown, returned future can be ignored.
pub fn shutdown(&self) -> Result<impl Future<Output = ()> + '_, ShutdownError> { pub fn shutdown(&self) -> Result<impl Future<Output = ()> + '_, ShutdownError> {
Self::shutdown_inner(&self.shutdown_state)
.map(|()| async move { self.shutdown_notify_back.notified().await })
}
fn shutdown_inner(shutdown_state: &AtomicShutdownState) -> Result<(), ShutdownError> {
use ShutdownState::*; use ShutdownState::*;
let res = self.shutdown_state.compare_exchange(Running, ShuttingDown); let res = shutdown_state.compare_exchange(Running, ShuttingDown);
match res { match res {
Ok(_) | Err(ShuttingDown) => { Ok(_) | Err(ShuttingDown) => Ok(()),
Ok(async move { self.shutdown_notify_back.notified().await })
}
Err(IsntRunning) => return Err(ShutdownError::IsntRunning), Err(IsntRunning) => return Err(ShutdownError::IsntRunning),
Err(Running) => unreachable!(), Err(Running) => unreachable!(),
} }

View file

@ -9,6 +9,7 @@
| `macros` | Re-exports macros from [`teloxide-macros`]. | | `macros` | Re-exports macros from [`teloxide-macros`]. |
| `native-tls` | Enables the [`native-tls`] TLS implementation (enabled by default). | | `native-tls` | Enables the [`native-tls`] TLS implementation (enabled by default). |
| `rustls` | Enables the [`rustls`] TLS implementation. | | `rustls` | Enables the [`rustls`] TLS implementation. |
| `ctrlc_handler` | Enables [`Dispatcher::setup_ctrlc_handler`](dispatching::Dispatcher::setup_ctrlc_handler) function. |
| `auto-send` | Enables the `AutoSend` bot adaptor. | | `auto-send` | Enables the `AutoSend` bot adaptor. |
| `cache-me` | Enables the `CacheMe` bot adaptor. | | `cache-me` | Enables the `CacheMe` bot adaptor. |
| `frunk` | Enables [`teloxide::utils::UpState`]. | | `frunk` | Enables [`teloxide::utils::UpState`]. |