diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 820a0f17..6a94258b 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -55,7 +55,7 @@ pub struct Dispatcher<R> { running_handlers: FuturesUnordered<JoinHandle<()>>, - shutdown_state: Arc<AtomicShutdownState>, + state: Arc<DispatcherState>, shutdown_notify_back: Arc<Notify>, } @@ -82,7 +82,7 @@ where my_chat_members_queue: None, chat_members_queue: None, running_handlers: FuturesUnordered::new(), - shutdown_state: <_>::default(), + state: <_>::default(), shutdown_notify_back: <_>::default(), } } @@ -108,7 +108,7 @@ where #[cfg(feature = "ctrlc_handler")] #[cfg_attr(docsrs, doc(cfg(feature = "ctrlc_handler")))] pub fn setup_ctrlc_handler(self) -> Self { - let shutdown_state = Arc::clone(&self.shutdown_state); + let state = Arc::clone(&self.state); tokio::spawn(async move { loop { tokio::signal::ctrl_c().await.expect("Failed to listen for ^C"); @@ -116,7 +116,7 @@ where log::debug!("^C receieved, trying to shutdown dispatcher"); // If dispatcher wasn't running, then there is nothing to do - shutdown_inner(&shutdown_state).ok(); + shutdown_inner(&state).ok(); } }); @@ -287,10 +287,10 @@ where let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener); let mut stop_token = Some(update_listener.stop_token()); - if let Err(actual) = self.shutdown_state.compare_exchange(IsntRunning, Running) { + if let Err(actual) = self.state.compare_exchange(Idle, Running) { unreachable!( - "Dispatching is already running: expected `IsntRunning` state, found `{:?}`", - actual + "Dispatching is already running: expected `{:?}` state, found `{:?}`", + Idle, actual ); } @@ -306,7 +306,7 @@ where } } - if let ShuttingDown = self.shutdown_state.load() { + if let ShuttingDown = self.state.load() { if let Some(token) = stop_token.take() { log::debug!("Start shutting down dispatching"); token.stop(); @@ -317,7 +317,7 @@ where self.wait_for_handlers().await; - if let ShuttingDown = self.shutdown_state.load() { + if let ShuttingDown = self.state.load() { // Stopped because of a `shutdown` call. // Notify `shutdown`s that we finished @@ -327,13 +327,13 @@ where log::debug!("Dispatching stopped (listener returned `None`)"); } - self.shutdown_state.store(IsntRunning); + self.state.store(Idle); } /// Returns shutdown token, which can later be used to shutdown dispatching. pub fn shutdown_token(&self) -> ShutdownToken { ShutdownToken { - shutdown_state: Arc::clone(&self.shutdown_state), + dispatcher_state: Arc::clone(&self.state), shutdown_notify_back: Arc::clone(&self.shutdown_notify_back), } } @@ -461,7 +461,7 @@ where /// A token which can be used to shutdown dispatcher. #[derive(Clone)] pub struct ShutdownToken { - shutdown_state: Arc<AtomicShutdownState>, + dispatcher_state: Arc<DispatcherState>, shutdown_notify_back: Arc<Notify>, } @@ -472,7 +472,7 @@ impl ShutdownToken { /// /// If you don't need to wait for shutdown, returned future can be ignored. pub fn shutdown(&self) -> Result<impl Future<Output = ()> + '_, ShutdownError> { - shutdown_inner(&self.shutdown_state) + shutdown_inner(&self.dispatcher_state) .map(|()| async move { self.shutdown_notify_back.notified().await }) } } @@ -480,14 +480,15 @@ impl ShutdownToken { /// Error occured while trying to shutdown dispatcher. #[derive(Debug)] pub enum ShutdownError { - IsntRunning, + /// Couldn"t stop dispatcher since it wasn't running. + Idle, } -struct AtomicShutdownState { +struct DispatcherState { inner: AtomicU8, } -impl AtomicShutdownState { +impl DispatcherState { fn load(&self) -> ShutdownState { ShutdownState::from_u8(self.inner.load(Ordering::SeqCst)) } @@ -508,9 +509,9 @@ impl AtomicShutdownState { } } -impl Default for AtomicShutdownState { +impl Default for DispatcherState { fn default() -> Self { - Self { inner: AtomicU8::new(ShutdownState::IsntRunning as _) } + Self { inner: AtomicU8::new(ShutdownState::Idle as _) } } } @@ -519,19 +520,19 @@ impl Default for AtomicShutdownState { enum ShutdownState { Running, ShuttingDown, - IsntRunning, + Idle, } impl ShutdownState { fn from_u8(n: u8) -> Self { const RUNNING: u8 = ShutdownState::Running as u8; const SHUTTING_DOWN: u8 = ShutdownState::ShuttingDown as u8; - const ISNT_RUNNING: u8 = ShutdownState::IsntRunning as u8; + const IDLE: u8 = ShutdownState::Idle as u8; match n { RUNNING => ShutdownState::Running, SHUTTING_DOWN => ShutdownState::ShuttingDown, - ISNT_RUNNING => ShutdownState::IsntRunning, + IDLE => ShutdownState::Idle, _ => unreachable!(), } } @@ -549,14 +550,14 @@ fn shutdown_check_timeout_for<E>(update_listener: &impl UpdateListener<E>) -> Du shutdown_check_timeout.checked_add(MIN_SHUTDOWN_CHECK_TIMEOUT).unwrap_or(shutdown_check_timeout) } -fn shutdown_inner(shutdown_state: &AtomicShutdownState) -> Result<(), ShutdownError> { +fn shutdown_inner(state: &DispatcherState) -> Result<(), ShutdownError> { use ShutdownState::*; - let res = shutdown_state.compare_exchange(Running, ShuttingDown); + let res = state.compare_exchange(Running, ShuttingDown); match res { Ok(_) | Err(ShuttingDown) => Ok(()), - Err(IsntRunning) => Err(ShutdownError::IsntRunning), + Err(Idle) => Err(ShutdownError::Idle), Err(Running) => unreachable!(), } }