Add Dispatcher::shutdown function

This function allows to gracefuly shutdown dispatching.
This commit is contained in:
Waffle 2021-05-18 18:04:55 +03:00
parent eae2bced6d
commit 41a95079b2
2 changed files with 252 additions and 119 deletions

View file

@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `Storage::get_dialogue` to obtain a dialogue indexed by a chat ID.
- `InMemStorageError` with a single variant `DialogueNotFound` to be returned from `InMemStorage::remove_dialogue`.
- `RedisStorageError::DialogueNotFound` and `SqliteStorageError::DialogueNotFound` to be returned from `Storage::remove_dialogue`.
- `Dispatcher::shutdown` function.
### Changed

View file

@ -4,16 +4,27 @@ use crate::{
},
error_handlers::{ErrorHandler, LoggingErrorHandler},
};
use futures::StreamExt;
use std::{fmt::Debug, sync::Arc};
use core::panic;
use futures::{Future, StreamExt};
use std::{
fmt::Debug,
sync::{
atomic::{AtomicU8, Ordering},
Arc,
},
time::Duration,
};
use teloxide_core::{
requests::Requester,
types::{
CallbackQuery, ChatMemberUpdated, ChosenInlineResult, InlineQuery, Message, Poll,
PollAnswer, PreCheckoutQuery, ShippingQuery, UpdateKind,
PollAnswer, PreCheckoutQuery, ShippingQuery, Update, UpdateKind,
},
};
use tokio::sync::mpsc;
use tokio::{
sync::{mpsc, Notify},
time::timeout,
};
type Tx<Upd, R> = Option<mpsc::UnboundedSender<UpdateWithCx<Upd, R>>>;
@ -63,6 +74,9 @@ pub struct Dispatcher<R> {
poll_answers_queue: Tx<R, PollAnswer>,
my_chat_members_queue: Tx<R, ChatMemberUpdated>,
chat_members_queue: Tx<R, ChatMemberUpdated>,
shutdown_state: AtomicShutdownState,
shutdown_notify_back: Notify,
}
impl<R> Dispatcher<R>
@ -87,6 +101,10 @@ where
poll_answers_queue: None,
my_chat_members_queue: None,
chat_members_queue: None,
shutdown_state: AtomicShutdownState {
inner: AtomicU8::new(ShutdownState::IsntRunning as _),
},
shutdown_notify_back: Notify::new(),
}
}
@ -251,125 +269,239 @@ where
ListenerE: Debug,
R: Requester + Clone,
{
update_listener
.as_stream()
.for_each(move |update| {
let update_listener_error_handler = Arc::clone(&update_listener_error_handler);
use ShutdownState::*;
async move {
log::trace!("Dispatcher received an update: {:?}", update);
const MIN_SHUTDOWN_CHECK_TIMEOUT: Duration = Duration::from_secs(1);
let update = match update {
Ok(update) => update,
Err(error) => {
Arc::clone(&update_listener_error_handler).handle_error(error).await;
return;
}
};
let shutdown_check_timeout =
update_listener.timeout_hint().unwrap_or(Duration::ZERO) + MIN_SHUTDOWN_CHECK_TIMEOUT;
match update.kind {
UpdateKind::Message(message) => {
send!(
&self.requester,
&self.messages_queue,
message,
UpdateKind::Message
);
}
UpdateKind::EditedMessage(message) => {
send!(
&self.requester,
&self.edited_messages_queue,
message,
UpdateKind::EditedMessage
);
}
UpdateKind::ChannelPost(post) => {
send!(
&self.requester,
&self.channel_posts_queue,
post,
UpdateKind::ChannelPost
);
}
UpdateKind::EditedChannelPost(post) => {
send!(
&self.requester,
&self.edited_channel_posts_queue,
post,
UpdateKind::EditedChannelPost
);
}
UpdateKind::InlineQuery(query) => {
send!(
&self.requester,
&self.inline_queries_queue,
query,
UpdateKind::InlineQuery
);
}
UpdateKind::ChosenInlineResult(result) => {
send!(
&self.requester,
&self.chosen_inline_results_queue,
result,
UpdateKind::ChosenInlineResult
);
}
UpdateKind::CallbackQuery(query) => {
send!(
&self.requester,
&self.callback_queries_queue,
query,
UpdateKind::CallbackQuer
);
}
UpdateKind::ShippingQuery(query) => {
send!(
&self.requester,
&self.shipping_queries_queue,
query,
UpdateKind::ShippingQuery
);
}
UpdateKind::PreCheckoutQuery(query) => {
send!(
&self.requester,
&self.pre_checkout_queries_queue,
query,
UpdateKind::PreCheckoutQuery
);
}
UpdateKind::Poll(poll) => {
send!(&self.requester, &self.polls_queue, poll, UpdateKind::Poll);
}
UpdateKind::PollAnswer(answer) => {
send!(
&self.requester,
&self.poll_answers_queue,
answer,
UpdateKind::PollAnswer
);
}
UpdateKind::MyChatMember(chat_member_updated) => {
send!(
&self.requester,
&self.my_chat_members_queue,
chat_member_updated,
UpdateKind::MyChatMember
);
}
UpdateKind::ChatMember(chat_member_updated) => {
send!(
&self.requester,
&self.chat_members_queue,
chat_member_updated,
UpdateKind::MyChatMember
);
}
if let Err(_) = self.shutdown_state.compare_exchange(IsntRunning, Running) {
panic!("Dispatching is already running");
}
{
let stream = update_listener.as_stream();
tokio::pin!(stream);
loop {
if let Ok(upd) = timeout(shutdown_check_timeout, stream.next()).await {
match upd {
None => break,
Some(upd) => self.process_update(upd, &update_listener_error_handler).await,
}
}
})
.await
if let ShuttingDown = self.shutdown_state.load() {
break;
}
}
}
update_listener.stop();
update_listener
.as_stream()
.for_each(|upd| self.process_update(upd, &update_listener_error_handler))
.await;
if let ShuttingDown = self.shutdown_state.load() {
// Stopped because of a `shutdown` call.
// Notify `shutdown`s that we finished
self.shutdown_notify_back.notify_waiters();
}
self.shutdown_state.store(IsntRunning);
}
/// Tries shutting down dispatching.
///
/// Returns error if this dispather isn't dispathing at the moment.
///
/// If you don't need to wait for shutdown returned future can be ignored.
pub fn shutdown(&self) -> Result<impl Future<Output = ()> + '_, ShutdownError> {
use ShutdownState::*;
let res = self.shutdown_state.compare_exchange(Running, ShuttingDown);
match res {
Ok(_) | Err(ShuttingDown) => {
Ok(async move { self.shutdown_notify_back.notified().await })
}
Err(IsntRunning) => return Err(ShutdownError::IsntRunning),
Err(Running) => unreachable!(),
}
}
async fn process_update<ListenerE, Eh>(
&self,
update: Result<Update, ListenerE>,
update_listener_error_handler: &Arc<Eh>,
) where
R: Requester + Clone,
Eh: ErrorHandler<ListenerE>,
ListenerE: Debug,
{
{
log::trace!("Dispatcher received an update: {:?}", update);
let update = match update {
Ok(update) => update,
Err(error) => {
Arc::clone(update_listener_error_handler).handle_error(error).await;
return;
}
};
match update.kind {
UpdateKind::Message(message) => {
send!(&self.requester, &self.messages_queue, message, UpdateKind::Message);
}
UpdateKind::EditedMessage(message) => {
send!(
&self.requester,
&self.edited_messages_queue,
message,
UpdateKind::EditedMessage
);
}
UpdateKind::ChannelPost(post) => {
send!(
&self.requester,
&self.channel_posts_queue,
post,
UpdateKind::ChannelPost
);
}
UpdateKind::EditedChannelPost(post) => {
send!(
&self.requester,
&self.edited_channel_posts_queue,
post,
UpdateKind::EditedChannelPost
);
}
UpdateKind::InlineQuery(query) => {
send!(
&self.requester,
&self.inline_queries_queue,
query,
UpdateKind::InlineQuery
);
}
UpdateKind::ChosenInlineResult(result) => {
send!(
&self.requester,
&self.chosen_inline_results_queue,
result,
UpdateKind::ChosenInlineResult
);
}
UpdateKind::CallbackQuery(query) => {
send!(
&self.requester,
&self.callback_queries_queue,
query,
UpdateKind::CallbackQuer
);
}
UpdateKind::ShippingQuery(query) => {
send!(
&self.requester,
&self.shipping_queries_queue,
query,
UpdateKind::ShippingQuery
);
}
UpdateKind::PreCheckoutQuery(query) => {
send!(
&self.requester,
&self.pre_checkout_queries_queue,
query,
UpdateKind::PreCheckoutQuery
);
}
UpdateKind::Poll(poll) => {
send!(&self.requester, &self.polls_queue, poll, UpdateKind::Poll);
}
UpdateKind::PollAnswer(answer) => {
send!(
&self.requester,
&self.poll_answers_queue,
answer,
UpdateKind::PollAnswer
);
}
UpdateKind::MyChatMember(chat_member_updated) => {
send!(
&self.requester,
&self.my_chat_members_queue,
chat_member_updated,
UpdateKind::MyChatMember
);
}
UpdateKind::ChatMember(chat_member_updated) => {
send!(
&self.requester,
&self.chat_members_queue,
chat_member_updated,
UpdateKind::MyChatMember
);
}
}
}
}
}
#[derive(Debug)]
pub enum ShutdownError {
IsntRunning,
}
struct AtomicShutdownState {
inner: AtomicU8,
}
impl AtomicShutdownState {
fn load(&self) -> ShutdownState {
ShutdownState::from_u8(self.inner.load(Ordering::SeqCst))
}
fn store(&self, new: ShutdownState) {
self.inner.store(new as _, Ordering::SeqCst)
}
fn compare_exchange(
&self,
current: ShutdownState,
new: ShutdownState,
) -> Result<ShutdownState, ShutdownState> {
self.inner
.compare_exchange(current as _, new as _, Ordering::SeqCst, Ordering::SeqCst)
.map(ShutdownState::from_u8)
.map_err(ShutdownState::from_u8)
}
}
#[repr(u8)]
enum ShutdownState {
Running,
ShuttingDown,
IsntRunning,
}
impl ShutdownState {
fn from_u8(n: u8) -> Self {
const RUNNING: u8 = ShutdownState::Running as u8;
const SHUTTING_DOWN: u8 = ShutdownState::ShuttingDown as u8;
const ISNT_RUNNING: u8 = ShutdownState::IsntRunning as u8;
match n {
RUNNING => ShutdownState::Running,
SHUTTING_DOWN => ShutdownState::ShuttingDown,
ISNT_RUNNING => ShutdownState::IsntRunning,
_ => unreachable!(),
}
}
}