Add dispatcher shutdown token

This commit adds `ShutdownToken` which can be obrained throught
`Dispatcher::shutdown_token` and then later be used to shutdown dispatching.
This commit is contained in:
Waffle 2021-06-25 17:41:02 +03:00
parent c378d6ef4e
commit 569ef222fc

View file

@ -30,22 +30,6 @@ use tokio::{
type Tx<Upd, R> = Option<mpsc::UnboundedSender<UpdateWithCx<Upd, R>>>;
fn send<'a, R, Upd>(requester: &'a R, tx: &'a Tx<R, Upd>, update: Upd, variant: &'static str)
where
Upd: Debug,
R: Requester + Clone,
{
if let Some(tx) = tx {
if let Err(error) = tx.send(UpdateWithCx { requester: requester.clone(), update }) {
log::error!(
"The RX part of the {} channel is closed, but an update is received.\nError:{}\n",
variant,
error
);
}
}
}
/// One dispatcher to rule them all.
///
/// See the [module-level documentation](crate::dispatching) for the design
@ -68,7 +52,7 @@ pub struct Dispatcher<R> {
chat_members_queue: Tx<R, ChatMemberUpdated>,
shutdown_state: Arc<AtomicShutdownState>,
shutdown_notify_back: Notify,
shutdown_notify_back: Arc<Notify>,
}
impl<R> Dispatcher<R>
@ -96,7 +80,7 @@ where
shutdown_state: Arc::new(AtomicShutdownState {
inner: AtomicU8::new(ShutdownState::IsntRunning as _),
}),
shutdown_notify_back: Notify::new(),
shutdown_notify_back: Arc::new(Notify::new()),
}
}
@ -128,7 +112,7 @@ where
tokio::signal::ctrl_c().await.expect("Failed to listen for ^C");
// If dispatcher wasn't running, then there is nothing to do
Self::shutdown_inner(&shutdown_state).ok();
shutdown_inner(&shutdown_state).ok();
}
});
@ -340,19 +324,15 @@ where
///
/// If you don't need to wait for shutdown, returned future can be ignored.
pub fn shutdown(&self) -> Result<impl Future<Output = ()> + '_, ShutdownError> {
Self::shutdown_inner(&self.shutdown_state)
shutdown_inner(&self.shutdown_state)
.map(|()| async move { self.shutdown_notify_back.notified().await })
}
fn shutdown_inner(shutdown_state: &AtomicShutdownState) -> Result<(), ShutdownError> {
use ShutdownState::*;
let res = shutdown_state.compare_exchange(Running, ShuttingDown);
match res {
Ok(_) | Err(ShuttingDown) => Ok(()),
Err(IsntRunning) => Err(ShutdownError::IsntRunning),
Err(Running) => unreachable!(),
/// Returns shutdown token, which can later be used to shutdown dispatching.
pub fn shutdown_token(&self) -> ShutdownToken {
ShutdownToken {
shutdown_state: Arc::clone(&self.shutdown_state),
shutdown_notify_back: Arc::clone(&self.shutdown_notify_back),
}
}
@ -454,6 +434,23 @@ where
}
}
pub struct ShutdownToken {
shutdown_state: Arc<AtomicShutdownState>,
shutdown_notify_back: Arc<Notify>,
}
impl ShutdownToken {
/// Tries to shutdown dispatching.
///
/// Returns error if this dispather isn't dispatching at the moment.
///
/// If you don't need to wait for shutdown, returned future can be ignored.
pub fn shutdown(&self) -> Result<impl Future<Output = ()> + '_, ShutdownError> {
shutdown_inner(&self.shutdown_state)
.map(|()| async move { self.shutdown_notify_back.notified().await })
}
}
#[derive(Debug)]
pub enum ShutdownError {
IsntRunning,
@ -505,3 +502,31 @@ impl ShutdownState {
}
}
}
fn shutdown_inner(shutdown_state: &AtomicShutdownState) -> Result<(), ShutdownError> {
use ShutdownState::*;
let res = shutdown_state.compare_exchange(Running, ShuttingDown);
match res {
Ok(_) | Err(ShuttingDown) => Ok(()),
Err(IsntRunning) => Err(ShutdownError::IsntRunning),
Err(Running) => unreachable!(),
}
}
fn send<'a, R, Upd>(requester: &'a R, tx: &'a Tx<R, Upd>, update: Upd, variant: &'static str)
where
Upd: Debug,
R: Requester + Clone,
{
if let Some(tx) = tx {
if let Err(error) = tx.send(UpdateWithCx { requester: requester.clone(), update }) {
log::error!(
"The RX part of the {} channel is closed, but an update is received.\nError:{}\n",
variant,
error
);
}
}
}