Apply suggestions from the review: rename AtomicShutdownState => DispatcherState, IsntRunning => Idle

This commit is contained in:
Waffle 2021-06-26 23:04:36 +03:00
parent 1795cb22c2
commit 483e544223

View file

@ -55,7 +55,7 @@ pub struct Dispatcher<R> {
running_handlers: FuturesUnordered<JoinHandle<()>>,
shutdown_state: Arc<AtomicShutdownState>,
state: Arc<DispatcherState>,
shutdown_notify_back: Arc<Notify>,
}
@ -82,7 +82,7 @@ where
my_chat_members_queue: None,
chat_members_queue: None,
running_handlers: FuturesUnordered::new(),
shutdown_state: <_>::default(),
state: <_>::default(),
shutdown_notify_back: <_>::default(),
}
}
@ -108,7 +108,7 @@ where
#[cfg(feature = "ctrlc_handler")]
#[cfg_attr(docsrs, doc(cfg(feature = "ctrlc_handler")))]
pub fn setup_ctrlc_handler(self) -> Self {
let shutdown_state = Arc::clone(&self.shutdown_state);
let state = Arc::clone(&self.state);
tokio::spawn(async move {
loop {
tokio::signal::ctrl_c().await.expect("Failed to listen for ^C");
@ -116,7 +116,7 @@ where
log::debug!("^C receieved, trying to shutdown dispatcher");
// If dispatcher wasn't running, then there is nothing to do
shutdown_inner(&shutdown_state).ok();
shutdown_inner(&state).ok();
}
});
@ -287,10 +287,10 @@ where
let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener);
let mut stop_token = Some(update_listener.stop_token());
if let Err(actual) = self.shutdown_state.compare_exchange(IsntRunning, Running) {
if let Err(actual) = self.state.compare_exchange(Idle, Running) {
unreachable!(
"Dispatching is already running: expected `IsntRunning` state, found `{:?}`",
actual
"Dispatching is already running: expected `{:?}` state, found `{:?}`",
Idle, actual
);
}
@ -306,7 +306,7 @@ where
}
}
if let ShuttingDown = self.shutdown_state.load() {
if let ShuttingDown = self.state.load() {
if let Some(token) = stop_token.take() {
log::debug!("Start shutting down dispatching");
token.stop();
@ -317,7 +317,7 @@ where
self.wait_for_handlers().await;
if let ShuttingDown = self.shutdown_state.load() {
if let ShuttingDown = self.state.load() {
// Stopped because of a `shutdown` call.
// Notify `shutdown`s that we finished
@ -327,13 +327,13 @@ where
log::debug!("Dispatching stopped (listener returned `None`)");
}
self.shutdown_state.store(IsntRunning);
self.state.store(Idle);
}
/// Returns shutdown token, which can later be used to shutdown dispatching.
pub fn shutdown_token(&self) -> ShutdownToken {
ShutdownToken {
shutdown_state: Arc::clone(&self.shutdown_state),
dispatcher_state: Arc::clone(&self.state),
shutdown_notify_back: Arc::clone(&self.shutdown_notify_back),
}
}
@ -461,7 +461,7 @@ where
/// A token which can be used to shutdown dispatcher.
#[derive(Clone)]
pub struct ShutdownToken {
shutdown_state: Arc<AtomicShutdownState>,
dispatcher_state: Arc<DispatcherState>,
shutdown_notify_back: Arc<Notify>,
}
@ -472,7 +472,7 @@ impl ShutdownToken {
///
/// If you don't need to wait for shutdown, returned future can be ignored.
pub fn shutdown(&self) -> Result<impl Future<Output = ()> + '_, ShutdownError> {
shutdown_inner(&self.shutdown_state)
shutdown_inner(&self.dispatcher_state)
.map(|()| async move { self.shutdown_notify_back.notified().await })
}
}
@ -480,14 +480,15 @@ impl ShutdownToken {
/// Error occured while trying to shutdown dispatcher.
#[derive(Debug)]
pub enum ShutdownError {
IsntRunning,
/// Couldn"t stop dispatcher since it wasn't running.
Idle,
}
struct AtomicShutdownState {
struct DispatcherState {
inner: AtomicU8,
}
impl AtomicShutdownState {
impl DispatcherState {
fn load(&self) -> ShutdownState {
ShutdownState::from_u8(self.inner.load(Ordering::SeqCst))
}
@ -508,9 +509,9 @@ impl AtomicShutdownState {
}
}
impl Default for AtomicShutdownState {
impl Default for DispatcherState {
fn default() -> Self {
Self { inner: AtomicU8::new(ShutdownState::IsntRunning as _) }
Self { inner: AtomicU8::new(ShutdownState::Idle as _) }
}
}
@ -519,19 +520,19 @@ impl Default for AtomicShutdownState {
enum ShutdownState {
Running,
ShuttingDown,
IsntRunning,
Idle,
}
impl ShutdownState {
fn from_u8(n: u8) -> Self {
const RUNNING: u8 = ShutdownState::Running as u8;
const SHUTTING_DOWN: u8 = ShutdownState::ShuttingDown as u8;
const ISNT_RUNNING: u8 = ShutdownState::IsntRunning as u8;
const IDLE: u8 = ShutdownState::Idle as u8;
match n {
RUNNING => ShutdownState::Running,
SHUTTING_DOWN => ShutdownState::ShuttingDown,
ISNT_RUNNING => ShutdownState::IsntRunning,
IDLE => ShutdownState::Idle,
_ => unreachable!(),
}
}
@ -549,14 +550,14 @@ fn shutdown_check_timeout_for<E>(update_listener: &impl UpdateListener<E>) -> Du
shutdown_check_timeout.checked_add(MIN_SHUTDOWN_CHECK_TIMEOUT).unwrap_or(shutdown_check_timeout)
}
fn shutdown_inner(shutdown_state: &AtomicShutdownState) -> Result<(), ShutdownError> {
fn shutdown_inner(state: &DispatcherState) -> Result<(), ShutdownError> {
use ShutdownState::*;
let res = shutdown_state.compare_exchange(Running, ShuttingDown);
let res = state.compare_exchange(Running, ShuttingDown);
match res {
Ok(_) | Err(ShuttingDown) => Ok(()),
Err(IsntRunning) => Err(ShutdownError::IsntRunning),
Err(Idle) => Err(ShutdownError::Idle),
Err(Running) => unreachable!(),
}
}