Don't use timeout to check ShutdownToken

This commit is contained in:
Maybe Waffle 2023-09-25 20:02:57 +04:00
parent 836080523f
commit 07e08bef6c
2 changed files with 22 additions and 27 deletions

View file

@ -7,12 +7,10 @@ use crate::{
requests::{Request, Requester}, requests::{Request, Requester},
types::{Update, UpdateKind}, types::{Update, UpdateKind},
update_listeners::{self, UpdateListener}, update_listeners::{self, UpdateListener},
utils::shutdown_token::shutdown_check_timeout_for,
}; };
use dptree::di::{DependencyMap, DependencySupplier}; use dptree::di::{DependencyMap, DependencySupplier};
use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use tokio::time::timeout;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use std::{ use std::{
@ -312,7 +310,6 @@ where
log::debug!("hinting allowed updates: {:?}", allowed_updates); log::debug!("hinting allowed updates: {:?}", allowed_updates);
update_listener.hint_allowed_updates(&mut allowed_updates.into_iter()); update_listener.hint_allowed_updates(&mut allowed_updates.into_iter());
let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener);
let mut stop_token = Some(update_listener.stop_token()); let mut stop_token = Some(update_listener.stop_token());
self.state.start_dispatching(); self.state.start_dispatching();
@ -324,19 +321,16 @@ where
loop { loop {
self.remove_inactive_workers_if_needed().await; self.remove_inactive_workers_if_needed().await;
// False positive tokio::select! {
#[allow(clippy::collapsible_match)] upd = stream.next() => match upd {
if let Ok(upd) = timeout(shutdown_check_timeout, stream.next()).await {
match upd {
None => break, None => break,
Some(upd) => self.process_update(upd, &update_listener_error_handler).await, Some(upd) => self.process_update(upd, &update_listener_error_handler).await,
} },
} () = self.state.wait_for_changes() => if self.state.is_shutting_down() {
if let Some(token) = stop_token.take() {
if self.state.is_shutting_down() { log::debug!("Start shutting down dispatching...");
if let Some(token) = stop_token.take() { token.stop();
log::debug!("Start shutting down dispatching..."); }
token.stop();
} }
} }
} }

View file

@ -5,18 +5,16 @@ use std::{
atomic::{AtomicU8, Ordering}, atomic::{AtomicU8, Ordering},
Arc, Arc,
}, },
time::Duration,
}; };
use tokio::sync::Notify; use tokio::sync::Notify;
use crate::update_listeners::UpdateListener;
/// A token which used to shutdown [`Dispatcher`]. /// A token which used to shutdown [`Dispatcher`].
/// ///
/// [`Dispatcher`]: crate::dispatching::Dispatcher /// [`Dispatcher`]: crate::dispatching::Dispatcher
#[derive(Clone)] #[derive(Clone)]
pub struct ShutdownToken { pub struct ShutdownToken {
// FIXME: use a single arc
dispatcher_state: Arc<DispatcherState>, dispatcher_state: Arc<DispatcherState>,
shutdown_notify_back: Arc<Notify>, shutdown_notify_back: Arc<Notify>,
} }
@ -49,11 +47,16 @@ impl ShutdownToken {
Self { Self {
dispatcher_state: Arc::new(DispatcherState { dispatcher_state: Arc::new(DispatcherState {
inner: AtomicU8::new(ShutdownState::Idle as _), inner: AtomicU8::new(ShutdownState::Idle as _),
notify: <_>::default(),
}), }),
shutdown_notify_back: <_>::default(), shutdown_notify_back: <_>::default(),
} }
} }
pub(crate) async fn wait_for_changes(&self) {
self.dispatcher_state.notify.notified().await;
}
pub(crate) fn start_dispatching(&self) { pub(crate) fn start_dispatching(&self) {
if let Err(actual) = if let Err(actual) =
self.dispatcher_state.compare_exchange(ShutdownState::Idle, ShutdownState::Running) self.dispatcher_state.compare_exchange(ShutdownState::Idle, ShutdownState::Running)
@ -93,27 +96,20 @@ impl fmt::Display for IdleShutdownError {
impl std::error::Error for IdleShutdownError {} impl std::error::Error for IdleShutdownError {}
pub(crate) fn shutdown_check_timeout_for(update_listener: &impl UpdateListener) -> Duration {
const MIN_SHUTDOWN_CHECK_TIMEOUT: Duration = Duration::from_secs(1);
const DZERO: Duration = Duration::ZERO;
let shutdown_check_timeout = update_listener.timeout_hint().unwrap_or(DZERO);
shutdown_check_timeout.saturating_add(MIN_SHUTDOWN_CHECK_TIMEOUT)
}
struct DispatcherState { struct DispatcherState {
inner: AtomicU8, inner: AtomicU8,
notify: Notify,
} }
impl DispatcherState { impl DispatcherState {
// Ordering::Relaxed: only one atomic variable, nothing to synchronize. // Ordering::Relaxed: only one atomic variable, nothing to synchronize.
fn load(&self) -> ShutdownState { fn load(&self) -> ShutdownState {
ShutdownState::from_u8(self.inner.load(Ordering::Relaxed)) ShutdownState::from_u8(self.inner.load(Ordering::Relaxed))
} }
fn store(&self, new: ShutdownState) { fn store(&self, new: ShutdownState) {
self.inner.store(new as _, Ordering::Relaxed) self.inner.store(new as _, Ordering::Relaxed);
self.notify.notify_waiters();
} }
fn compare_exchange( fn compare_exchange(
@ -125,6 +121,11 @@ impl DispatcherState {
.compare_exchange(current as _, new as _, Ordering::Relaxed, Ordering::Relaxed) .compare_exchange(current as _, new as _, Ordering::Relaxed, Ordering::Relaxed)
.map(ShutdownState::from_u8) .map(ShutdownState::from_u8)
.map_err(ShutdownState::from_u8) .map_err(ShutdownState::from_u8)
// FIXME: `Result::inspect` when :(
.map(|st| {
self.notify.notify_waiters();
st
})
} }
} }