diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 0cbdcefe..a26e55c2 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -1,13 +1,3 @@ -use crate::{ - dispatching::{ - stop_token::StopToken, - update_listeners::{self, UpdateListener}, - DispatcherHandler, UpdateWithCx, - }, - error_handlers::{ErrorHandler, LoggingErrorHandler}, -}; -use core::panic; -use futures::{stream::FuturesUnordered, Future, StreamExt}; use std::{ fmt::Debug, sync::{ @@ -16,6 +6,17 @@ use std::{ }, time::Duration, }; + +use crate::{ + dispatching::{ + stop_token::StopToken, + update_listeners::{self, UpdateListener}, + DispatcherHandler, UpdateWithCx, + }, + error_handlers::{ErrorHandler, LoggingErrorHandler}, +}; + +use futures::{stream::FuturesUnordered, Future, StreamExt}; use teloxide_core::{ requests::Requester, types::{ @@ -81,15 +82,12 @@ where my_chat_members_queue: None, chat_members_queue: None, running_handlers: FuturesUnordered::new(), - shutdown_state: Arc::new(AtomicShutdownState { - inner: AtomicU8::new(ShutdownState::IsntRunning as _), - }), - shutdown_notify_back: Arc::new(Notify::new()), + shutdown_state: <_>::default(), + shutdown_notify_back: <_>::default(), } } #[must_use] - #[allow(clippy::unnecessary_wraps)] fn new_tx(&mut self, h: H) -> Tx where H: DispatcherHandler + Send + 'static, @@ -97,10 +95,7 @@ where R: Send + 'static, { let (tx, rx) = mpsc::unbounded_channel(); - let join_handle = tokio::spawn(async move { - let fut = h.handle(rx); - fut.await; - }); + let join_handle = tokio::spawn(h.handle(rx)); self.running_handlers.push(join_handle); @@ -251,7 +246,8 @@ where /// errors produced by this listener). /// /// Please note that after shutting down (either because of [`shutdown`], - /// [ctrlc signal], or `update_listener` returning `None`) + /// [ctrlc signal], or `update_listener` returning `None`) all handlers will + /// be gone. As such, to restart listening you need to re-add handlers. /// /// [`shutdown`]; ShutdownToken::shutdown /// [ctrlc signal]: Dispatcher::setup_ctrlc_handler @@ -260,11 +256,11 @@ where R: Requester + Clone, ::GetUpdatesFaultTolerant: Send, { - self.dispatch_with_listener( - update_listeners::polling_default(self.requester.clone()).await, - LoggingErrorHandler::with_custom_text("An error from the update listener"), - ) - .await; + let listener = update_listeners::polling_default(self.requester.clone()).await; + let error_handler = + LoggingErrorHandler::with_custom_text("An error from the update listener"); + + self.dispatch_with_listener(listener, error_handler).await; } /// Starts your bot with custom `update_listener` and @@ -288,22 +284,14 @@ where { use ShutdownState::*; - const MIN_SHUTDOWN_CHECK_TIMEOUT: Duration = Duration::from_secs(1); - - // FIXME: replace this by just Duration::ZERO once 1.53 will be released - const DZERO: Duration = Duration::from_secs(0); - - let shutdown_check_timeout = update_listener.timeout_hint().unwrap_or(DZERO); - - // FIXME: replace this by just saturating_add once 1.53 will be released - let shutdown_check_timeout = shutdown_check_timeout - .checked_add(MIN_SHUTDOWN_CHECK_TIMEOUT) - .unwrap_or(shutdown_check_timeout); - + let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener); let mut stop_token = Some(update_listener.stop_token()); - if let Err(_) = self.shutdown_state.compare_exchange(IsntRunning, Running) { - panic!("Dispatching is already running"); + if let Err(actual) = self.shutdown_state.compare_exchange(IsntRunning, Running) { + unreachable!( + "Dispatching is already running: expected `IsntRunning` state, found `{:?}`", + actual + ); } { @@ -327,23 +315,7 @@ where } } - // Drop all senders, so handlers can stop - self.messages_queue.take(); - self.edited_messages_queue.take(); - self.channel_posts_queue.take(); - self.edited_channel_posts_queue.take(); - self.inline_queries_queue.take(); - self.chosen_inline_results_queue.take(); - self.callback_queries_queue.take(); - self.shipping_queries_queue.take(); - self.pre_checkout_queries_queue.take(); - self.polls_queue.take(); - self.poll_answers_queue.take(); - self.my_chat_members_queue.take(); - self.chat_members_queue.take(); - - // Wait untill all handlers process all updates - self.running_handlers.by_ref().for_each(|_| async {}).await; + self.wait_for_handlers().await; if let ShuttingDown = self.shutdown_state.load() { // Stopped because of a `shutdown` call. @@ -462,8 +434,32 @@ where } } } + + async fn wait_for_handlers(&mut self) { + log::debug!("Waiting for handlers to finish"); + + // Drop all senders, so handlers can stop + self.messages_queue.take(); + self.edited_messages_queue.take(); + self.channel_posts_queue.take(); + self.edited_channel_posts_queue.take(); + self.inline_queries_queue.take(); + self.chosen_inline_results_queue.take(); + self.callback_queries_queue.take(); + self.shipping_queries_queue.take(); + self.pre_checkout_queries_queue.take(); + self.polls_queue.take(); + self.poll_answers_queue.take(); + self.my_chat_members_queue.take(); + self.chat_members_queue.take(); + + // Wait untill all handlers finish + self.running_handlers.by_ref().for_each(|_| async {}).await; + } } +/// A token which can be used to shutdown dispatcher. +#[derive(Clone)] pub struct ShutdownToken { shutdown_state: Arc, shutdown_notify_back: Arc, @@ -481,6 +477,7 @@ impl ShutdownToken { } } +/// Error occured while trying to shutdown dispatcher. #[derive(Debug)] pub enum ShutdownError { IsntRunning, @@ -511,7 +508,14 @@ impl AtomicShutdownState { } } +impl Default for AtomicShutdownState { + fn default() -> Self { + Self { inner: AtomicU8::new(ShutdownState::IsntRunning as _) } + } +} + #[repr(u8)] +#[derive(Debug)] enum ShutdownState { Running, ShuttingDown, @@ -533,6 +537,18 @@ impl ShutdownState { } } +fn shutdown_check_timeout_for(update_listener: &impl UpdateListener) -> Duration { + const MIN_SHUTDOWN_CHECK_TIMEOUT: Duration = Duration::from_secs(1); + + // FIXME: replace this by just Duration::ZERO once 1.53 will be released + const DZERO: Duration = Duration::from_secs(0); + + let shutdown_check_timeout = update_listener.timeout_hint().unwrap_or(DZERO); + + // FIXME: replace this by just saturating_add once 1.53 will be released + shutdown_check_timeout.checked_add(MIN_SHUTDOWN_CHECK_TIMEOUT).unwrap_or(shutdown_check_timeout) +} + fn shutdown_inner(shutdown_state: &AtomicShutdownState) -> Result<(), ShutdownError> { use ShutdownState::*;