Minor Dispatcher cleanup

This commit is contained in:
Waffle 2021-06-26 15:08:32 +03:00
parent 7cfb207b44
commit 8e3ef4ab67

View file

@ -1,13 +1,3 @@
use crate::{
dispatching::{
stop_token::StopToken,
update_listeners::{self, UpdateListener},
DispatcherHandler, UpdateWithCx,
},
error_handlers::{ErrorHandler, LoggingErrorHandler},
};
use core::panic;
use futures::{stream::FuturesUnordered, Future, StreamExt};
use std::{ use std::{
fmt::Debug, fmt::Debug,
sync::{ sync::{
@ -16,6 +6,17 @@ use std::{
}, },
time::Duration, time::Duration,
}; };
use crate::{
dispatching::{
stop_token::StopToken,
update_listeners::{self, UpdateListener},
DispatcherHandler, UpdateWithCx,
},
error_handlers::{ErrorHandler, LoggingErrorHandler},
};
use futures::{stream::FuturesUnordered, Future, StreamExt};
use teloxide_core::{ use teloxide_core::{
requests::Requester, requests::Requester,
types::{ types::{
@ -81,15 +82,12 @@ where
my_chat_members_queue: None, my_chat_members_queue: None,
chat_members_queue: None, chat_members_queue: None,
running_handlers: FuturesUnordered::new(), running_handlers: FuturesUnordered::new(),
shutdown_state: Arc::new(AtomicShutdownState { shutdown_state: <_>::default(),
inner: AtomicU8::new(ShutdownState::IsntRunning as _), shutdown_notify_back: <_>::default(),
}),
shutdown_notify_back: Arc::new(Notify::new()),
} }
} }
#[must_use] #[must_use]
#[allow(clippy::unnecessary_wraps)]
fn new_tx<H, Upd>(&mut self, h: H) -> Tx<R, Upd> fn new_tx<H, Upd>(&mut self, h: H) -> Tx<R, Upd>
where where
H: DispatcherHandler<R, Upd> + Send + 'static, H: DispatcherHandler<R, Upd> + Send + 'static,
@ -97,10 +95,7 @@ where
R: Send + 'static, R: Send + 'static,
{ {
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
let join_handle = tokio::spawn(async move { let join_handle = tokio::spawn(h.handle(rx));
let fut = h.handle(rx);
fut.await;
});
self.running_handlers.push(join_handle); self.running_handlers.push(join_handle);
@ -251,7 +246,8 @@ where
/// errors produced by this listener). /// errors produced by this listener).
/// ///
/// Please note that after shutting down (either because of [`shutdown`], /// Please note that after shutting down (either because of [`shutdown`],
/// [ctrlc signal], or `update_listener` returning `None`) /// [ctrlc signal], or `update_listener` returning `None`) all handlers will
/// be gone. As such, to restart listening you need to re-add handlers.
/// ///
/// [`shutdown`]; ShutdownToken::shutdown /// [`shutdown`]; ShutdownToken::shutdown
/// [ctrlc signal]: Dispatcher::setup_ctrlc_handler /// [ctrlc signal]: Dispatcher::setup_ctrlc_handler
@ -260,11 +256,11 @@ where
R: Requester + Clone, R: Requester + Clone,
<R as Requester>::GetUpdatesFaultTolerant: Send, <R as Requester>::GetUpdatesFaultTolerant: Send,
{ {
self.dispatch_with_listener( let listener = update_listeners::polling_default(self.requester.clone()).await;
update_listeners::polling_default(self.requester.clone()).await, let error_handler =
LoggingErrorHandler::with_custom_text("An error from the update listener"), LoggingErrorHandler::with_custom_text("An error from the update listener");
)
.await; self.dispatch_with_listener(listener, error_handler).await;
} }
/// Starts your bot with custom `update_listener` and /// Starts your bot with custom `update_listener` and
@ -288,22 +284,14 @@ where
{ {
use ShutdownState::*; use ShutdownState::*;
const MIN_SHUTDOWN_CHECK_TIMEOUT: Duration = Duration::from_secs(1); let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener);
// FIXME: replace this by just Duration::ZERO once 1.53 will be released
const DZERO: Duration = Duration::from_secs(0);
let shutdown_check_timeout = update_listener.timeout_hint().unwrap_or(DZERO);
// FIXME: replace this by just saturating_add once 1.53 will be released
let shutdown_check_timeout = shutdown_check_timeout
.checked_add(MIN_SHUTDOWN_CHECK_TIMEOUT)
.unwrap_or(shutdown_check_timeout);
let mut stop_token = Some(update_listener.stop_token()); let mut stop_token = Some(update_listener.stop_token());
if let Err(_) = self.shutdown_state.compare_exchange(IsntRunning, Running) { if let Err(actual) = self.shutdown_state.compare_exchange(IsntRunning, Running) {
panic!("Dispatching is already running"); unreachable!(
"Dispatching is already running: expected `IsntRunning` state, found `{:?}`",
actual
);
} }
{ {
@ -327,23 +315,7 @@ where
} }
} }
// Drop all senders, so handlers can stop self.wait_for_handlers().await;
self.messages_queue.take();
self.edited_messages_queue.take();
self.channel_posts_queue.take();
self.edited_channel_posts_queue.take();
self.inline_queries_queue.take();
self.chosen_inline_results_queue.take();
self.callback_queries_queue.take();
self.shipping_queries_queue.take();
self.pre_checkout_queries_queue.take();
self.polls_queue.take();
self.poll_answers_queue.take();
self.my_chat_members_queue.take();
self.chat_members_queue.take();
// Wait untill all handlers process all updates
self.running_handlers.by_ref().for_each(|_| async {}).await;
if let ShuttingDown = self.shutdown_state.load() { if let ShuttingDown = self.shutdown_state.load() {
// Stopped because of a `shutdown` call. // Stopped because of a `shutdown` call.
@ -462,8 +434,32 @@ where
} }
} }
} }
async fn wait_for_handlers(&mut self) {
log::debug!("Waiting for handlers to finish");
// Drop all senders, so handlers can stop
self.messages_queue.take();
self.edited_messages_queue.take();
self.channel_posts_queue.take();
self.edited_channel_posts_queue.take();
self.inline_queries_queue.take();
self.chosen_inline_results_queue.take();
self.callback_queries_queue.take();
self.shipping_queries_queue.take();
self.pre_checkout_queries_queue.take();
self.polls_queue.take();
self.poll_answers_queue.take();
self.my_chat_members_queue.take();
self.chat_members_queue.take();
// Wait untill all handlers finish
self.running_handlers.by_ref().for_each(|_| async {}).await;
}
} }
/// A token which can be used to shutdown dispatcher.
#[derive(Clone)]
pub struct ShutdownToken { pub struct ShutdownToken {
shutdown_state: Arc<AtomicShutdownState>, shutdown_state: Arc<AtomicShutdownState>,
shutdown_notify_back: Arc<Notify>, shutdown_notify_back: Arc<Notify>,
@ -481,6 +477,7 @@ impl ShutdownToken {
} }
} }
/// Error occured while trying to shutdown dispatcher.
#[derive(Debug)] #[derive(Debug)]
pub enum ShutdownError { pub enum ShutdownError {
IsntRunning, IsntRunning,
@ -511,7 +508,14 @@ impl AtomicShutdownState {
} }
} }
impl Default for AtomicShutdownState {
fn default() -> Self {
Self { inner: AtomicU8::new(ShutdownState::IsntRunning as _) }
}
}
#[repr(u8)] #[repr(u8)]
#[derive(Debug)]
enum ShutdownState { enum ShutdownState {
Running, Running,
ShuttingDown, ShuttingDown,
@ -533,6 +537,18 @@ impl ShutdownState {
} }
} }
fn shutdown_check_timeout_for<E>(update_listener: &impl UpdateListener<E>) -> Duration {
const MIN_SHUTDOWN_CHECK_TIMEOUT: Duration = Duration::from_secs(1);
// FIXME: replace this by just Duration::ZERO once 1.53 will be released
const DZERO: Duration = Duration::from_secs(0);
let shutdown_check_timeout = update_listener.timeout_hint().unwrap_or(DZERO);
// FIXME: replace this by just saturating_add once 1.53 will be released
shutdown_check_timeout.checked_add(MIN_SHUTDOWN_CHECK_TIMEOUT).unwrap_or(shutdown_check_timeout)
}
fn shutdown_inner(shutdown_state: &AtomicShutdownState) -> Result<(), ShutdownError> { fn shutdown_inner(shutdown_state: &AtomicShutdownState) -> Result<(), ShutdownError> {
use ShutdownState::*; use ShutdownState::*;