From aeca45a579d70e14fe48efa439943be43f824eb3 Mon Sep 17 00:00:00 2001 From: Waffle Date: Tue, 18 May 2021 14:46:45 +0300 Subject: [PATCH 01/28] Refactor `UpdateListner` trait Instead of `Stream` super trait we now require `for<'a> AsUpdateStream<'a, E'>` as a super trait. `AsUpdateStream` in turn provides `as_stream` function which takes `&mut self` and returns `Self::Stream` (the trait is needed to workaround lack of GAT). This patch also adds a `stop` function that allows stopping the listener. Since `UpdateListner` now isn't `Stream` and required methods, it's blanked implementation for streams was removed. `polling` and `polling_default` functions now also require `R: 'static`. --- CHANGELOG.md | 7 ++ src/dispatching/dispatcher.rs | 5 +- src/dispatching/update_listeners.rs | 180 +++++++++++++++++++++++++--- 3 files changed, 171 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 63def763..2ae661dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Return an error from `Storage::remove_dialogue` if a dialogue does not exist. - Require `D: Clone` in `dialogues_repl(_with_listener)` and `InMemStorage`. - Automatically delete a webhook if it was set up in `update_listeners::polling_default` (thereby making it `async`, [issue 319](https://github.com/teloxide/teloxide/issues/319)). + - `polling` and `polling_default` now require `R: 'static` + - Refactor `UpdateListener` trait: + - Add a `stop` function that allows stopping the listener. + - Remove blanked implementation. + - Remove `Stream` from super traits. + - Add `AsUpdateStream` to super traits. + - Add an `AsUpdateStream` trait that allows turning implementors into streams of updates (GAT workaround). ### Fixed diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 93bf8a7e..e0a8aa4d 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -243,7 +243,7 @@ where /// `update_listener_error_handler`. pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>( &'a self, - update_listener: UListener, + mut update_listener: UListener, update_listener_error_handler: Arc, ) where UListener: UpdateListener + 'a, @@ -251,9 +251,8 @@ where ListenerE: Debug, R: Requester + Clone, { - let update_listener = Box::pin(update_listener); - update_listener + .as_stream() .for_each(move |update| { let update_listener_error_handler = Arc::clone(&update_listener_error_handler); diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index bdece104..b577b9ab 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -111,11 +111,50 @@ use teloxide_core::{ types::{AllowedUpdate, SemiparsedVec, Update}, }; -/// A generic update listener. -pub trait UpdateListener: Stream> { - // TODO: add some methods here (.shutdown(), etc). +/// An update listener. +/// +/// Implementors of this trait allow getting updates from Telegram. +/// +/// Currently Telegram has 2 ways of getting updates -- [polling] and +/// [webhooks]. Currently, only the former one is implemented (see [`polling`] +/// and [`polling_default`]) +/// +/// Some functions of this trait are located in the supertrait +/// ([`AsUpdateStream`]), see also: +/// - [`Stream`] +/// - [`as_stream`] +/// +/// [polling]: self#long-polling +/// [webhooks]: self#webhooks +/// [`Stream`]: AsUpdateStream::Stream +/// [`as_stream`]: AsUpdateStream::as_stream +pub trait UpdateListener: for<'a> AsUpdateStream<'a, E> { + /// Stop listening for updates. + /// + /// This function is not guaranteed to have an immidiate effect. That is + /// some listners can return updates even after [`stop`] is called (e.g.: + /// because of buffering). + /// + /// [`stop`]: UpdateListener::stop + /// + /// Implementors of this function are encouraged to stop listening for + /// updates as soon as possible and return `None` from the update stream as + /// soon as all cached updates are returned. + fn stop(&mut self); +} + +/// [`UpdateListener`]'s supertrait/extension. +/// +/// This trait is a workaround to not require GAT. +pub trait AsUpdateStream<'a, E> { + /// Stream of updates from Telegram. + type Stream: Stream> + 'a; + + /// Creates the update [`Stream`]. + /// + /// [`Stream`]: AsUpdateStream::Stream + fn as_stream(&'a mut self) -> Self::Stream; } -impl UpdateListener for S where S: Stream> {} /// Returns a long polling update listener with `timeout` of 10 seconds. /// @@ -126,7 +165,7 @@ impl UpdateListener for S where S: Stream> {} /// This function will automatically delete a webhook if it was set up. pub async fn polling_default(requester: R) -> impl UpdateListener where - R: Requester, + R: Requester + 'static, ::GetUpdatesFaultTolerant: Send, { delete_webhook_if_setup(&requester).await; @@ -152,19 +191,58 @@ pub fn polling( allowed_updates: Option>, ) -> impl UpdateListener where - R: Requester, + R: Requester + 'static, ::GetUpdatesFaultTolerant: Send, { - let timeout = timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")); + enum RunningState { + Polling, + Stopping, + Stopped, + } + + struct State { + bot: B, + timeout: Option, + limit: Option, + allowed_updates: Option>, + offset: i32, + run_state: RunningState, + } + + fn stream(st: &mut State) -> impl Stream> + '_ + where + B: Requester, + { + stream::unfold(st, move |state| async move { + let State { timeout, limit, allowed_updates, bot, offset, run_state, .. } = &mut *state; + + match run_state { + RunningState::Polling => {} + RunningState::Stopped => return None, + RunningState::Stopping => { + let mut req = bot.get_updates_fault_tolerant(); + + let payload = &mut req.payload_mut().0; + payload.offset = Some(*offset); + payload.timeout = *timeout; + payload.limit = Some(1); + payload.allowed_updates = allowed_updates.take(); + + return match req.send().await { + Ok(_) => { + *run_state = RunningState::Stopped; + None + } + Err(err) => Some((stream::iter(vec![Err(err)]), state)), + }; + } + } - stream::unfold( - (allowed_updates, requester, 0), - move |(mut allowed_updates, bot, mut offset)| async move { let mut req = bot.get_updates_fault_tolerant(); let payload = &mut req.payload_mut().0; - payload.offset = Some(offset); - payload.timeout = timeout; - payload.limit = limit; + payload.offset = Some(*offset); + payload.timeout = *timeout; + payload.limit = *limit; payload.allowed_updates = allowed_updates.take(); let updates = match req.send().await { @@ -181,7 +259,7 @@ where .expect("update_id must be i32"), }; - offset = id + 1; + *offset = id + 1; } for update in &updates { @@ -200,10 +278,27 @@ where } }; - Some((stream::iter(updates), (allowed_updates, bot, offset))) - }, - ) - .flatten() + Some((stream::iter(updates), state)) + }) + .flatten() + } + + let timeout = timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")); + + let state = State { + bot: requester, + timeout, + limit, + allowed_updates, + offset: 0, + run_state: RunningState::Polling, + }; + + let stop = assert_stop_fn(|st: &mut State<_>| { + st.run_state = RunningState::Stopping; + }); + + StatefulListner { state, stream, stop } } async fn delete_webhook_if_setup(requester: &R) @@ -226,3 +321,52 @@ where } } } + +/// A listner created from `state` and `stream`/`stop` functions. +struct StatefulListner { + /// The state of the listner. + state: St, + + /// Function used as `AsUpdateStream::as_stream`. + /// + /// Must be of type `for<'a> &'a mut St -> impl Stream + 'a` and callable by + /// `&mut`. + stream: Sf, + + /// Function used as `UpdateListner::stop`. + /// + /// Must be of type `for<'a> &'a mut St`. + stop: Option, +} + +impl<'a, St, Sf, F, Strm, E> AsUpdateStream<'a, E> for StatefulListner +where + (St, Strm): 'a, + Sf: FnMut(&'a mut St) -> Strm, + Strm: Stream>, +{ + type Stream = Strm; + + fn as_stream(&'a mut self) -> Self::Stream { + (self.stream)(&mut self.state) + } +} + +impl UpdateListener for StatefulListner +where + Self: for<'a> AsUpdateStream<'a, E>, + F: FnOnce(&mut St), +{ + fn stop(&mut self) { + self.stop.take().map(|stop| stop(&mut self.state)); + } +} + +/// Assert (at compile tume) that `f` is fine as a stop-function (closure +/// lifetime inference workaround). +fn assert_stop_fn(f: F) -> Option +where + F: FnOnce(&mut St), +{ + Some(f) +} From c288a540b9890bbd3c45d132f63e939f26cd1513 Mon Sep 17 00:00:00 2001 From: Waffle Date: Tue, 18 May 2021 17:09:20 +0300 Subject: [PATCH 02/28] Store fetched updates in `polling::State` This way updates aren't lost when the stream is dropped. --- src/dispatching/update_listeners.rs | 131 +++++++++++++++++----------- 1 file changed, 81 insertions(+), 50 deletions(-) diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index b577b9ab..e88d2635 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -105,7 +105,7 @@ use futures::{stream, Stream, StreamExt}; -use std::{convert::TryInto, time::Duration}; +use std::{convert::TryInto, iter, time::Duration, vec}; use teloxide_core::{ requests::{HasPayload, Request, Requester}, types::{AllowedUpdate, SemiparsedVec, Update}, @@ -153,6 +153,8 @@ pub trait AsUpdateStream<'a, E> { /// Creates the update [`Stream`]. /// /// [`Stream`]: AsUpdateStream::Stream + /// + /// Returned stream **must not** be stateful. State must be kept in `self`. fn as_stream(&'a mut self) -> Self::Stream; } @@ -200,13 +202,28 @@ where Stopped, } - struct State { + struct State { bot: B, timeout: Option, limit: Option, allowed_updates: Option>, offset: i32, run_state: RunningState, + + // Updates fetched last time. + // + // We need to store them here so we can drop stream without loosing state. + fetched: Option< + iter::Map< + iter::FilterMap< + vec::IntoIter>, + fn( + Result, + ) -> std::option::Option, + >, + fn(Update) -> Result, + >, + >, } fn stream(st: &mut State) -> impl Stream> + '_ @@ -214,71 +231,84 @@ where B: Requester, { stream::unfold(st, move |state| async move { - let State { timeout, limit, allowed_updates, bot, offset, run_state, .. } = &mut *state; + let State { timeout, limit, allowed_updates, bot, offset, run_state, fetched, .. } = + &mut *state; - match run_state { - RunningState::Polling => {} - RunningState::Stopped => return None, - RunningState::Stopping => { - let mut req = bot.get_updates_fault_tolerant(); + let fetched_is_none_or_empty = fetched + .as_ref() + .map(|f| matches!(f.size_hint(), (_lower, Some(0)))) + .unwrap_or(true); - let payload = &mut req.payload_mut().0; - payload.offset = Some(*offset); - payload.timeout = *timeout; - payload.limit = Some(1); - payload.allowed_updates = allowed_updates.take(); + if fetched_is_none_or_empty { + match run_state { + RunningState::Polling => {} + RunningState::Stopped => return None, + RunningState::Stopping => { + let mut req = bot.get_updates_fault_tolerant(); - return match req.send().await { - Ok(_) => { - *run_state = RunningState::Stopped; - None - } - Err(err) => Some((stream::iter(vec![Err(err)]), state)), - }; - } - } + let payload = &mut req.payload_mut().0; + payload.offset = Some(*offset); + payload.timeout = *timeout; + payload.limit = Some(1); + payload.allowed_updates = allowed_updates.take(); - let mut req = bot.get_updates_fault_tolerant(); - let payload = &mut req.payload_mut().0; - payload.offset = Some(*offset); - payload.timeout = *timeout; - payload.limit = *limit; - payload.allowed_updates = allowed_updates.take(); - - let updates = match req.send().await { - Err(err) => vec![Err(err)], - Ok(SemiparsedVec(updates)) => { - // Set offset to the last update's id + 1 - if let Some(upd) = updates.last() { - let id: i32 = match upd { - Ok(ok) => ok.id, - Err((value, _)) => value["update_id"] - .as_i64() - .expect("The 'update_id' field must always exist in Update") - .try_into() - .expect("update_id must be i32"), + return match req.send().await { + Ok(_) => { + *run_state = RunningState::Stopped; + None + } + Err(err) => Some((stream::iter(Some(Err(err))), state)), }; - - *offset = id + 1; } + } - for update in &updates { - if let Err((value, e)) = update { - log::error!( + let mut req = bot.get_updates_fault_tolerant(); + let payload = &mut req.payload_mut().0; + payload.offset = Some(*offset); + payload.timeout = *timeout; + payload.limit = *limit; + payload.allowed_updates = allowed_updates.take(); + + let updates = match req.send().await { + Err(err) => return Some((stream::iter(Some(Err(err))), state)), + Ok(SemiparsedVec(updates)) => { + // Set offset to the last update's id + 1 + if let Some(upd) = updates.last() { + let id: i32 = match upd { + Ok(ok) => ok.id, + Err((value, _)) => value["update_id"] + .as_i64() + .expect("The 'update_id' field must always exist in Update") + .try_into() + .expect("update_id must be i32"), + }; + + *offset = id + 1; + } + + for update in &updates { + if let Err((value, e)) = update { + log::error!( "Cannot parse an update.\nError: {:?}\nValue: {}\n\ This is a bug in teloxide-core, please open an issue here: \ https://github.com/teloxide/teloxide-core/issues.", e, value ); + } } + + updates + .into_iter() + .filter_map(Result::ok as fn(_) -> _) + .map(Ok as fn(_) -> _) } + }; - updates.into_iter().filter_map(Result::ok).map(Ok).collect::>() - } - }; + *fetched = Some(updates); + } - Some((stream::iter(updates), state)) + Some((stream::iter(fetched.as_mut().and_then(|f| f.next())), state)) }) .flatten() } @@ -292,6 +322,7 @@ where allowed_updates, offset: 0, run_state: RunningState::Polling, + fetched: None, }; let stop = assert_stop_fn(|st: &mut State<_>| { From 9f5a222ed7ae4e1ab7bb4f6357253b0710e500a4 Mon Sep 17 00:00:00 2001 From: Waffle Date: Tue, 18 May 2021 17:29:45 +0300 Subject: [PATCH 03/28] Add `UpdateListener::timeout_hint` --- src/dispatching/update_listeners.rs | 49 ++++++++++++++++------------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index e88d2635..cc648ad4 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -141,6 +141,11 @@ pub trait UpdateListener: for<'a> AsUpdateStream<'a, E> { /// updates as soon as possible and return `None` from the update stream as /// soon as all cached updates are returned. fn stop(&mut self); + + /// Timeout duration hint. + fn timeout_hint(&self) -> Option { + None + } } /// [`UpdateListener`]'s supertrait/extension. @@ -313,11 +318,9 @@ where .flatten() } - let timeout = timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")); - let state = State { bot: requester, - timeout, + timeout: timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")), limit, allowed_updates, offset: 0, @@ -325,11 +328,11 @@ where fetched: None, }; - let stop = assert_stop_fn(|st: &mut State<_>| { - st.run_state = RunningState::Stopping; - }); + let stop = Some(|st: &mut State<_>| st.run_state = RunningState::Stopping); - StatefulListner { state, stream, stop } + let timeout_hint = Some(move |_: &State<_>| timeout); + + StatefulListner { state, stream, stop, timeout_hint } } async fn delete_webhook_if_setup(requester: &R) @@ -354,7 +357,7 @@ where } /// A listner created from `state` and `stream`/`stop` functions. -struct StatefulListner { +struct StatefulListner { /// The state of the listner. state: St, @@ -362,18 +365,24 @@ struct StatefulListner { /// /// Must be of type `for<'a> &'a mut St -> impl Stream + 'a` and callable by /// `&mut`. - stream: Sf, + stream: Assf, /// Function used as `UpdateListner::stop`. /// /// Must be of type `for<'a> &'a mut St`. - stop: Option, + stop: Option, + + /// Function used as `UpdateListner::timeout_hint`. + /// + /// Must be of type `for<'a> &'a St -> Option` and callable by + /// `&`. + timeout_hint: Option, } -impl<'a, St, Sf, F, Strm, E> AsUpdateStream<'a, E> for StatefulListner +impl<'a, St, Assf, Sf, Thf, Strm, E> AsUpdateStream<'a, E> for StatefulListner where (St, Strm): 'a, - Sf: FnMut(&'a mut St) -> Strm, + Assf: FnMut(&'a mut St) -> Strm, Strm: Stream>, { type Stream = Strm; @@ -383,21 +392,17 @@ where } } -impl UpdateListener for StatefulListner +impl UpdateListener for StatefulListner where Self: for<'a> AsUpdateStream<'a, E>, - F: FnOnce(&mut St), + Sf: FnOnce(&mut St), + Thf: Fn(&St) -> Option, { fn stop(&mut self) { self.stop.take().map(|stop| stop(&mut self.state)); } -} -/// Assert (at compile tume) that `f` is fine as a stop-function (closure -/// lifetime inference workaround). -fn assert_stop_fn(f: F) -> Option -where - F: FnOnce(&mut St), -{ - Some(f) + fn timeout_hint(&self) -> Option { + self.timeout_hint.as_ref().and_then(|f| f(&self.state)) + } } From eae2bced6df1a0378f041388e17ae4579263a16e Mon Sep 17 00:00:00 2001 From: Waffle Date: Tue, 18 May 2021 17:38:47 +0300 Subject: [PATCH 04/28] Simplify `polling` a little bit --- src/dispatching/update_listeners.rs | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index cc648ad4..602e8c19 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -107,6 +107,7 @@ use futures::{stream, Stream, StreamExt}; use std::{convert::TryInto, iter, time::Duration, vec}; use teloxide_core::{ + payloads::GetUpdates, requests::{HasPayload, Request, Requester}, types::{AllowedUpdate, SemiparsedVec, Update}, }; @@ -251,11 +252,12 @@ where RunningState::Stopping => { let mut req = bot.get_updates_fault_tolerant(); - let payload = &mut req.payload_mut().0; - payload.offset = Some(*offset); - payload.timeout = *timeout; - payload.limit = Some(1); - payload.allowed_updates = allowed_updates.take(); + req.payload_mut().0 = GetUpdates { + offset: Some(*offset), + timeout: *timeout, + limit: Some(1), + allowed_updates: allowed_updates.take(), + }; return match req.send().await { Ok(_) => { @@ -268,11 +270,12 @@ where } let mut req = bot.get_updates_fault_tolerant(); - let payload = &mut req.payload_mut().0; - payload.offset = Some(*offset); - payload.timeout = *timeout; - payload.limit = *limit; - payload.allowed_updates = allowed_updates.take(); + req.payload_mut().0 = GetUpdates { + offset: Some(*offset), + timeout: *timeout, + limit: *limit, + allowed_updates: allowed_updates.take(), + }; let updates = match req.send().await { Err(err) => return Some((stream::iter(Some(Err(err))), state)), From 41a95079b2bd40ccdd996a55ea105699b6075926 Mon Sep 17 00:00:00 2001 From: Waffle Date: Tue, 18 May 2021 18:04:55 +0300 Subject: [PATCH 05/28] Add Dispatcher::shutdown function This function allows to gracefuly shutdown dispatching. --- CHANGELOG.md | 1 + src/dispatching/dispatcher.rs | 370 +++++++++++++++++++++++----------- 2 files changed, 252 insertions(+), 119 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ae661dc..5d4310c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `Storage::get_dialogue` to obtain a dialogue indexed by a chat ID. - `InMemStorageError` with a single variant `DialogueNotFound` to be returned from `InMemStorage::remove_dialogue`. - `RedisStorageError::DialogueNotFound` and `SqliteStorageError::DialogueNotFound` to be returned from `Storage::remove_dialogue`. + - `Dispatcher::shutdown` function. ### Changed diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index e0a8aa4d..ea5d5844 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -4,16 +4,27 @@ use crate::{ }, error_handlers::{ErrorHandler, LoggingErrorHandler}, }; -use futures::StreamExt; -use std::{fmt::Debug, sync::Arc}; +use core::panic; +use futures::{Future, StreamExt}; +use std::{ + fmt::Debug, + sync::{ + atomic::{AtomicU8, Ordering}, + Arc, + }, + time::Duration, +}; use teloxide_core::{ requests::Requester, types::{ CallbackQuery, ChatMemberUpdated, ChosenInlineResult, InlineQuery, Message, Poll, - PollAnswer, PreCheckoutQuery, ShippingQuery, UpdateKind, + PollAnswer, PreCheckoutQuery, ShippingQuery, Update, UpdateKind, }, }; -use tokio::sync::mpsc; +use tokio::{ + sync::{mpsc, Notify}, + time::timeout, +}; type Tx = Option>>; @@ -63,6 +74,9 @@ pub struct Dispatcher { poll_answers_queue: Tx, my_chat_members_queue: Tx, chat_members_queue: Tx, + + shutdown_state: AtomicShutdownState, + shutdown_notify_back: Notify, } impl Dispatcher @@ -87,6 +101,10 @@ where poll_answers_queue: None, my_chat_members_queue: None, chat_members_queue: None, + shutdown_state: AtomicShutdownState { + inner: AtomicU8::new(ShutdownState::IsntRunning as _), + }, + shutdown_notify_back: Notify::new(), } } @@ -251,125 +269,239 @@ where ListenerE: Debug, R: Requester + Clone, { - update_listener - .as_stream() - .for_each(move |update| { - let update_listener_error_handler = Arc::clone(&update_listener_error_handler); + use ShutdownState::*; - async move { - log::trace!("Dispatcher received an update: {:?}", update); + const MIN_SHUTDOWN_CHECK_TIMEOUT: Duration = Duration::from_secs(1); - let update = match update { - Ok(update) => update, - Err(error) => { - Arc::clone(&update_listener_error_handler).handle_error(error).await; - return; - } - }; + let shutdown_check_timeout = + update_listener.timeout_hint().unwrap_or(Duration::ZERO) + MIN_SHUTDOWN_CHECK_TIMEOUT; - match update.kind { - UpdateKind::Message(message) => { - send!( - &self.requester, - &self.messages_queue, - message, - UpdateKind::Message - ); - } - UpdateKind::EditedMessage(message) => { - send!( - &self.requester, - &self.edited_messages_queue, - message, - UpdateKind::EditedMessage - ); - } - UpdateKind::ChannelPost(post) => { - send!( - &self.requester, - &self.channel_posts_queue, - post, - UpdateKind::ChannelPost - ); - } - UpdateKind::EditedChannelPost(post) => { - send!( - &self.requester, - &self.edited_channel_posts_queue, - post, - UpdateKind::EditedChannelPost - ); - } - UpdateKind::InlineQuery(query) => { - send!( - &self.requester, - &self.inline_queries_queue, - query, - UpdateKind::InlineQuery - ); - } - UpdateKind::ChosenInlineResult(result) => { - send!( - &self.requester, - &self.chosen_inline_results_queue, - result, - UpdateKind::ChosenInlineResult - ); - } - UpdateKind::CallbackQuery(query) => { - send!( - &self.requester, - &self.callback_queries_queue, - query, - UpdateKind::CallbackQuer - ); - } - UpdateKind::ShippingQuery(query) => { - send!( - &self.requester, - &self.shipping_queries_queue, - query, - UpdateKind::ShippingQuery - ); - } - UpdateKind::PreCheckoutQuery(query) => { - send!( - &self.requester, - &self.pre_checkout_queries_queue, - query, - UpdateKind::PreCheckoutQuery - ); - } - UpdateKind::Poll(poll) => { - send!(&self.requester, &self.polls_queue, poll, UpdateKind::Poll); - } - UpdateKind::PollAnswer(answer) => { - send!( - &self.requester, - &self.poll_answers_queue, - answer, - UpdateKind::PollAnswer - ); - } - UpdateKind::MyChatMember(chat_member_updated) => { - send!( - &self.requester, - &self.my_chat_members_queue, - chat_member_updated, - UpdateKind::MyChatMember - ); - } - UpdateKind::ChatMember(chat_member_updated) => { - send!( - &self.requester, - &self.chat_members_queue, - chat_member_updated, - UpdateKind::MyChatMember - ); - } + if let Err(_) = self.shutdown_state.compare_exchange(IsntRunning, Running) { + panic!("Dispatching is already running"); + } + + { + let stream = update_listener.as_stream(); + tokio::pin!(stream); + + loop { + if let Ok(upd) = timeout(shutdown_check_timeout, stream.next()).await { + match upd { + None => break, + Some(upd) => self.process_update(upd, &update_listener_error_handler).await, } } - }) - .await + + if let ShuttingDown = self.shutdown_state.load() { + break; + } + } + } + + update_listener.stop(); + + update_listener + .as_stream() + .for_each(|upd| self.process_update(upd, &update_listener_error_handler)) + .await; + + if let ShuttingDown = self.shutdown_state.load() { + // Stopped because of a `shutdown` call. + + // Notify `shutdown`s that we finished + self.shutdown_notify_back.notify_waiters(); + } + + self.shutdown_state.store(IsntRunning); + } + + /// Tries shutting down dispatching. + /// + /// Returns error if this dispather isn't dispathing at the moment. + /// + /// If you don't need to wait for shutdown returned future can be ignored. + pub fn shutdown(&self) -> Result + '_, ShutdownError> { + use ShutdownState::*; + + let res = self.shutdown_state.compare_exchange(Running, ShuttingDown); + + match res { + Ok(_) | Err(ShuttingDown) => { + Ok(async move { self.shutdown_notify_back.notified().await }) + } + Err(IsntRunning) => return Err(ShutdownError::IsntRunning), + Err(Running) => unreachable!(), + } + } + + async fn process_update( + &self, + update: Result, + update_listener_error_handler: &Arc, + ) where + R: Requester + Clone, + Eh: ErrorHandler, + ListenerE: Debug, + { + { + log::trace!("Dispatcher received an update: {:?}", update); + + let update = match update { + Ok(update) => update, + Err(error) => { + Arc::clone(update_listener_error_handler).handle_error(error).await; + return; + } + }; + + match update.kind { + UpdateKind::Message(message) => { + send!(&self.requester, &self.messages_queue, message, UpdateKind::Message); + } + UpdateKind::EditedMessage(message) => { + send!( + &self.requester, + &self.edited_messages_queue, + message, + UpdateKind::EditedMessage + ); + } + UpdateKind::ChannelPost(post) => { + send!( + &self.requester, + &self.channel_posts_queue, + post, + UpdateKind::ChannelPost + ); + } + UpdateKind::EditedChannelPost(post) => { + send!( + &self.requester, + &self.edited_channel_posts_queue, + post, + UpdateKind::EditedChannelPost + ); + } + UpdateKind::InlineQuery(query) => { + send!( + &self.requester, + &self.inline_queries_queue, + query, + UpdateKind::InlineQuery + ); + } + UpdateKind::ChosenInlineResult(result) => { + send!( + &self.requester, + &self.chosen_inline_results_queue, + result, + UpdateKind::ChosenInlineResult + ); + } + UpdateKind::CallbackQuery(query) => { + send!( + &self.requester, + &self.callback_queries_queue, + query, + UpdateKind::CallbackQuer + ); + } + UpdateKind::ShippingQuery(query) => { + send!( + &self.requester, + &self.shipping_queries_queue, + query, + UpdateKind::ShippingQuery + ); + } + UpdateKind::PreCheckoutQuery(query) => { + send!( + &self.requester, + &self.pre_checkout_queries_queue, + query, + UpdateKind::PreCheckoutQuery + ); + } + UpdateKind::Poll(poll) => { + send!(&self.requester, &self.polls_queue, poll, UpdateKind::Poll); + } + UpdateKind::PollAnswer(answer) => { + send!( + &self.requester, + &self.poll_answers_queue, + answer, + UpdateKind::PollAnswer + ); + } + UpdateKind::MyChatMember(chat_member_updated) => { + send!( + &self.requester, + &self.my_chat_members_queue, + chat_member_updated, + UpdateKind::MyChatMember + ); + } + UpdateKind::ChatMember(chat_member_updated) => { + send!( + &self.requester, + &self.chat_members_queue, + chat_member_updated, + UpdateKind::MyChatMember + ); + } + } + } + } +} + +#[derive(Debug)] +pub enum ShutdownError { + IsntRunning, +} + +struct AtomicShutdownState { + inner: AtomicU8, +} + +impl AtomicShutdownState { + fn load(&self) -> ShutdownState { + ShutdownState::from_u8(self.inner.load(Ordering::SeqCst)) + } + + fn store(&self, new: ShutdownState) { + self.inner.store(new as _, Ordering::SeqCst) + } + + fn compare_exchange( + &self, + current: ShutdownState, + new: ShutdownState, + ) -> Result { + self.inner + .compare_exchange(current as _, new as _, Ordering::SeqCst, Ordering::SeqCst) + .map(ShutdownState::from_u8) + .map_err(ShutdownState::from_u8) + } +} + +#[repr(u8)] +enum ShutdownState { + Running, + ShuttingDown, + IsntRunning, +} + +impl ShutdownState { + fn from_u8(n: u8) -> Self { + const RUNNING: u8 = ShutdownState::Running as u8; + const SHUTTING_DOWN: u8 = ShutdownState::ShuttingDown as u8; + const ISNT_RUNNING: u8 = ShutdownState::IsntRunning as u8; + + match n { + RUNNING => ShutdownState::Running, + SHUTTING_DOWN => ShutdownState::ShuttingDown, + ISNT_RUNNING => ShutdownState::IsntRunning, + _ => unreachable!(), + } } } From f0de55ad5545e31d48510372254c227751877136 Mon Sep 17 00:00:00 2001 From: Waffle Date: Tue, 18 May 2021 18:30:57 +0300 Subject: [PATCH 06/28] Add `Dispatcher::setup_ctrlc_handler` function. This function sets up `^C` handler which shuts down dispatching. --- CHANGELOG.md | 3 ++- Cargo.toml | 3 +++ src/dispatching/dispatcher.rs | 46 +++++++++++++++++++++++++++-------- src/features.txt | 1 + 4 files changed, 42 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d4310c6..4a7b64c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `InMemStorageError` with a single variant `DialogueNotFound` to be returned from `InMemStorage::remove_dialogue`. - `RedisStorageError::DialogueNotFound` and `SqliteStorageError::DialogueNotFound` to be returned from `Storage::remove_dialogue`. - `Dispatcher::shutdown` function. + - `Dispatcher::setup_ctrlc_handler` function ([issue 153](https://github.com/teloxide/teloxide/issues/153)). ### Changed @@ -21,7 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Automatically delete a webhook if it was set up in `update_listeners::polling_default` (thereby making it `async`, [issue 319](https://github.com/teloxide/teloxide/issues/319)). - `polling` and `polling_default` now require `R: 'static` - Refactor `UpdateListener` trait: - - Add a `stop` function that allows stopping the listener. + - Add a `stop` function that allows stopping the listener ([issue 166](https://github.com/teloxide/teloxide/issues/166)). - Remove blanked implementation. - Remove `Stream` from super traits. - Add `AsUpdateStream` to super traits. diff --git a/Cargo.toml b/Cargo.toml index 509ca839..ec2eba2d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,8 @@ bincode-serializer = ["bincode"] frunk- = ["frunk"] macros = ["teloxide-macros"] +ctrlc_handler = ["tokio/signal"] + native-tls = ["teloxide-core/native-tls"] rustls = ["teloxide-core/rustls"] auto-send = ["teloxide-core/auto_send"] @@ -51,6 +53,7 @@ full = [ "bincode-serializer", "frunk", "macros", + "ctrlc_handler", "teloxide-core/full", "native-tls", "rustls", diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index ea5d5844..bf088394 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -75,7 +75,7 @@ pub struct Dispatcher { my_chat_members_queue: Tx, chat_members_queue: Tx, - shutdown_state: AtomicShutdownState, + shutdown_state: Arc, shutdown_notify_back: Notify, } @@ -101,9 +101,9 @@ where poll_answers_queue: None, my_chat_members_queue: None, chat_members_queue: None, - shutdown_state: AtomicShutdownState { + shutdown_state: Arc::new(AtomicShutdownState { inner: AtomicU8::new(ShutdownState::IsntRunning as _), - }, + }), shutdown_notify_back: Notify::new(), } } @@ -124,6 +124,25 @@ where Some(tx) } + /// Setup `^C` handler which [`shutdown`]s dispatching. + /// + /// [`shutdown`]: Dispatcher::shutdown + #[cfg(feature = "ctrlc_handler")] + #[cfg_attr(docsrs, doc(cfg(feature = "ctrlc_handler")))] + pub fn setup_ctrlc_handler(self) -> Self { + let shutdown_state = Arc::clone(&self.shutdown_state); + tokio::spawn(async move { + loop { + tokio::signal::ctrl_c().await.expect("Failed to listen for ^C"); + + // If dispatcher wasn't running, then there is nothing to do + Self::shutdown_inner(&shutdown_state).ok(); + } + }); + + self + } + #[must_use] pub fn messages_handler(mut self, h: H) -> Self where @@ -293,6 +312,7 @@ where } if let ShuttingDown = self.shutdown_state.load() { + log::debug!("Start shutting down dispatching"); break; } } @@ -310,25 +330,31 @@ where // Notify `shutdown`s that we finished self.shutdown_notify_back.notify_waiters(); + log::debug!("Dispatching shut down"); + } else { + log::debug!("Dispatching stopped (listner returned `None`)"); } self.shutdown_state.store(IsntRunning); } - /// Tries shutting down dispatching. + /// Tries to shutdown dispatching. /// - /// Returns error if this dispather isn't dispathing at the moment. + /// Returns error if this dispather isn't dispatching at the moment. /// - /// If you don't need to wait for shutdown returned future can be ignored. + /// If you don't need to wait for shutdown, returned future can be ignored. pub fn shutdown(&self) -> Result + '_, ShutdownError> { + Self::shutdown_inner(&self.shutdown_state) + .map(|()| async move { self.shutdown_notify_back.notified().await }) + } + + fn shutdown_inner(shutdown_state: &AtomicShutdownState) -> Result<(), ShutdownError> { use ShutdownState::*; - let res = self.shutdown_state.compare_exchange(Running, ShuttingDown); + let res = shutdown_state.compare_exchange(Running, ShuttingDown); match res { - Ok(_) | Err(ShuttingDown) => { - Ok(async move { self.shutdown_notify_back.notified().await }) - } + Ok(_) | Err(ShuttingDown) => Ok(()), Err(IsntRunning) => return Err(ShutdownError::IsntRunning), Err(Running) => unreachable!(), } diff --git a/src/features.txt b/src/features.txt index c0fa8e20..ee738058 100644 --- a/src/features.txt +++ b/src/features.txt @@ -9,6 +9,7 @@ | `macros` | Re-exports macros from [`teloxide-macros`]. | | `native-tls` | Enables the [`native-tls`] TLS implementation (enabled by default). | | `rustls` | Enables the [`rustls`] TLS implementation. | +| `ctrlc_handler` | Enables [`Dispatcher::setup_ctrlc_handler`](dispatching::Dispatcher::setup_ctrlc_handler) function. | | `auto-send` | Enables the `AutoSend` bot adaptor. | | `cache-me` | Enables the `CacheMe` bot adaptor. | | `frunk` | Enables [`teloxide::utils::UpState`]. | From d745f9bdb1a117dd990efd6531e10baedd775108 Mon Sep 17 00:00:00 2001 From: Waffle Date: Tue, 18 May 2021 18:42:02 +0300 Subject: [PATCH 07/28] Remove send! macro --- src/dispatching/dispatcher.rs | 170 ++++++++++++++-------------------- 1 file changed, 69 insertions(+), 101 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index bf088394..99551f8b 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -28,16 +28,6 @@ use tokio::{ type Tx = Option>>; -#[macro_use] -mod macros { - /// Pushes an update to a queue. - macro_rules! send { - ($requester:expr, $tx:expr, $update:expr, $variant:expr) => { - send($requester, $tx, $update, stringify!($variant)); - }; - } -} - fn send<'a, R, Upd>(requester: &'a R, tx: &'a Tx, update: Upd, variant: &'static str) where Upd: Debug, @@ -355,7 +345,7 @@ where match res { Ok(_) | Err(ShuttingDown) => Ok(()), - Err(IsntRunning) => return Err(ShutdownError::IsntRunning), + Err(IsntRunning) => Err(ShutdownError::IsntRunning), Err(Running) => unreachable!(), } } @@ -382,99 +372,77 @@ where match update.kind { UpdateKind::Message(message) => { - send!(&self.requester, &self.messages_queue, message, UpdateKind::Message); - } - UpdateKind::EditedMessage(message) => { - send!( - &self.requester, - &self.edited_messages_queue, - message, - UpdateKind::EditedMessage - ); - } - UpdateKind::ChannelPost(post) => { - send!( - &self.requester, - &self.channel_posts_queue, - post, - UpdateKind::ChannelPost - ); - } - UpdateKind::EditedChannelPost(post) => { - send!( - &self.requester, - &self.edited_channel_posts_queue, - post, - UpdateKind::EditedChannelPost - ); - } - UpdateKind::InlineQuery(query) => { - send!( - &self.requester, - &self.inline_queries_queue, - query, - UpdateKind::InlineQuery - ); - } - UpdateKind::ChosenInlineResult(result) => { - send!( - &self.requester, - &self.chosen_inline_results_queue, - result, - UpdateKind::ChosenInlineResult - ); - } - UpdateKind::CallbackQuery(query) => { - send!( - &self.requester, - &self.callback_queries_queue, - query, - UpdateKind::CallbackQuer - ); - } - UpdateKind::ShippingQuery(query) => { - send!( - &self.requester, - &self.shipping_queries_queue, - query, - UpdateKind::ShippingQuery - ); - } - UpdateKind::PreCheckoutQuery(query) => { - send!( - &self.requester, - &self.pre_checkout_queries_queue, - query, - UpdateKind::PreCheckoutQuery - ); + send(&self.requester, &self.messages_queue, message, "UpdateKind::Message") } + UpdateKind::EditedMessage(message) => send( + &self.requester, + &self.edited_messages_queue, + message, + "UpdateKind::EditedMessage", + ), + UpdateKind::ChannelPost(post) => send( + &self.requester, + &self.channel_posts_queue, + post, + "UpdateKind::ChannelPost", + ), + UpdateKind::EditedChannelPost(post) => send( + &self.requester, + &self.edited_channel_posts_queue, + post, + "UpdateKind::EditedChannelPost", + ), + UpdateKind::InlineQuery(query) => send( + &self.requester, + &self.inline_queries_queue, + query, + "UpdateKind::InlineQuery", + ), + UpdateKind::ChosenInlineResult(result) => send( + &self.requester, + &self.chosen_inline_results_queue, + result, + "UpdateKind::ChosenInlineResult", + ), + UpdateKind::CallbackQuery(query) => send( + &self.requester, + &self.callback_queries_queue, + query, + "UpdateKind::CallbackQuer", + ), + UpdateKind::ShippingQuery(query) => send( + &self.requester, + &self.shipping_queries_queue, + query, + "UpdateKind::ShippingQuery", + ), + UpdateKind::PreCheckoutQuery(query) => send( + &self.requester, + &self.pre_checkout_queries_queue, + query, + "UpdateKind::PreCheckoutQuery", + ), UpdateKind::Poll(poll) => { - send!(&self.requester, &self.polls_queue, poll, UpdateKind::Poll); - } - UpdateKind::PollAnswer(answer) => { - send!( - &self.requester, - &self.poll_answers_queue, - answer, - UpdateKind::PollAnswer - ); - } - UpdateKind::MyChatMember(chat_member_updated) => { - send!( - &self.requester, - &self.my_chat_members_queue, - chat_member_updated, - UpdateKind::MyChatMember - ); - } - UpdateKind::ChatMember(chat_member_updated) => { - send!( - &self.requester, - &self.chat_members_queue, - chat_member_updated, - UpdateKind::MyChatMember - ); + send(&self.requester, &self.polls_queue, poll, "UpdateKind::Poll") } + UpdateKind::PollAnswer(answer) => send( + &self.requester, + &self.poll_answers_queue, + answer, + "UpdateKind::PollAnswer", + ), + UpdateKind::MyChatMember(chat_member_updated) => send( + &self.requester, + &self.my_chat_members_queue, + chat_member_updated, + "UpdateKind::MyChatMember", + ), + UpdateKind::ChatMember(chat_member_updated) => send( + &self.requester, + &self.chat_members_queue, + chat_member_updated, + "UpdateKind::MyChatMember", + ), } } } From a5192a9ecb97f2ae0486c2b7f5e9b8a3fbf6d016 Mon Sep 17 00:00:00 2001 From: Waffle Date: Tue, 18 May 2021 18:42:33 +0300 Subject: [PATCH 08/28] Clippy --- src/dispatching/update_listeners.rs | 5 ++++- src/lib.rs | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index 602e8c19..c6dc3e6a 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -219,6 +219,7 @@ where // Updates fetched last time. // // We need to store them here so we can drop stream without loosing state. + #[allow(clippy::type_complexity)] fetched: Option< iter::Map< iter::FilterMap< @@ -402,7 +403,9 @@ where Thf: Fn(&St) -> Option, { fn stop(&mut self) { - self.stop.take().map(|stop| stop(&mut self.state)); + if let Some(stop) = self.stop.take() { + stop(&mut self.state) + } } fn timeout_hint(&self) -> Option { diff --git a/src/lib.rs b/src/lib.rs index 7079bbce..a22f0731 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,6 +54,7 @@ // $ RUSTDOCFLAGS="--cfg docsrs -Znormalize-docs" cargo +nightly doc --open --all-features // ``` #![cfg_attr(all(docsrs, feature = "nightly"), feature(doc_cfg))] +#![allow(clippy::redundant_pattern_matching)] pub use dispatching::repls::{ commands_repl, commands_repl_with_listener, dialogues_repl, dialogues_repl_with_listener, repl, From 00093664c70c14d89a115a16557213fc7c59e038 Mon Sep 17 00:00:00 2001 From: Waffle Date: Wed, 19 May 2021 10:19:41 +0300 Subject: [PATCH 09/28] Remove use of unstable feature (duration_zero) The feature has been stabilized in 1.53. --- src/dispatching/dispatcher.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 99551f8b..e538fa37 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -282,8 +282,11 @@ where const MIN_SHUTDOWN_CHECK_TIMEOUT: Duration = Duration::from_secs(1); + // FIXME: replace this by just Duration::ZERO once 1.53 will be released + const DZERO: Duration = Duration::from_secs(0); + let shutdown_check_timeout = - update_listener.timeout_hint().unwrap_or(Duration::ZERO) + MIN_SHUTDOWN_CHECK_TIMEOUT; + update_listener.timeout_hint().unwrap_or(DZERO) + MIN_SHUTDOWN_CHECK_TIMEOUT; if let Err(_) = self.shutdown_state.compare_exchange(IsntRunning, Running) { panic!("Dispatching is already running"); From a8306ec3ffdbb613aebf7f1e19fb8beff737a5a6 Mon Sep 17 00:00:00 2001 From: Waffle Lapkin Date: Fri, 21 May 2021 13:27:50 +0300 Subject: [PATCH 10/28] Apply suggestions from code review Co-authored-by: Hirrolot --- src/dispatching/update_listeners.rs | 4 ++-- src/features.txt | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index c6dc3e6a..0e56edd7 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -132,8 +132,8 @@ use teloxide_core::{ pub trait UpdateListener: for<'a> AsUpdateStream<'a, E> { /// Stop listening for updates. /// - /// This function is not guaranteed to have an immidiate effect. That is - /// some listners can return updates even after [`stop`] is called (e.g.: + /// This function is not guaranteed to have an immediate effect. That is + /// some listeners can return updates even after [`stop`] is called (e.g.: /// because of buffering). /// /// [`stop`]: UpdateListener::stop diff --git a/src/features.txt b/src/features.txt index ee738058..f6ef21ba 100644 --- a/src/features.txt +++ b/src/features.txt @@ -9,7 +9,7 @@ | `macros` | Re-exports macros from [`teloxide-macros`]. | | `native-tls` | Enables the [`native-tls`] TLS implementation (enabled by default). | | `rustls` | Enables the [`rustls`] TLS implementation. | -| `ctrlc_handler` | Enables [`Dispatcher::setup_ctrlc_handler`](dispatching::Dispatcher::setup_ctrlc_handler) function. | +| `ctrlc_handler` | Enables the [`Dispatcher::setup_ctrlc_handler`](dispatching::Dispatcher::setup_ctrlc_handler) function. | | `auto-send` | Enables the `AutoSend` bot adaptor. | | `cache-me` | Enables the `CacheMe` bot adaptor. | | `frunk` | Enables [`teloxide::utils::UpState`]. | From a1624783461a9de2629180d1c636aa14777bf8cb Mon Sep 17 00:00:00 2001 From: Waffle Date: Fri, 21 May 2021 23:58:46 +0300 Subject: [PATCH 11/28] Switch graceful shutdown to cancelation token-like system --- src/dispatching/dispatcher.rs | 21 ++- src/dispatching/mod.rs | 7 +- src/dispatching/stop_token.rs | 74 ++++++++++ src/dispatching/update_listeners.rs | 202 +++++++++++++--------------- 4 files changed, 179 insertions(+), 125 deletions(-) create mode 100644 src/dispatching/stop_token.rs diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index e538fa37..362d4157 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -1,6 +1,8 @@ use crate::{ dispatching::{ - update_listeners, update_listeners::UpdateListener, DispatcherHandler, UpdateWithCx, + stop_token::StopToken, + update_listeners::{self, UpdateListener}, + DispatcherHandler, UpdateWithCx, }, error_handlers::{ErrorHandler, LoggingErrorHandler}, }; @@ -288,6 +290,8 @@ where let shutdown_check_timeout = update_listener.timeout_hint().unwrap_or(DZERO) + MIN_SHUTDOWN_CHECK_TIMEOUT; + let mut stop_token = Some(update_listener.stop_token()); + if let Err(_) = self.shutdown_state.compare_exchange(IsntRunning, Running) { panic!("Dispatching is already running"); } @@ -305,19 +309,14 @@ where } if let ShuttingDown = self.shutdown_state.load() { - log::debug!("Start shutting down dispatching"); - break; + if let Some(token) = stop_token.take() { + log::debug!("Start shutting down dispatching"); + token.stop(); + } } } } - update_listener.stop(); - - update_listener - .as_stream() - .for_each(|upd| self.process_update(upd, &update_listener_error_handler)) - .await; - if let ShuttingDown = self.shutdown_state.load() { // Stopped because of a `shutdown` call. @@ -325,7 +324,7 @@ where self.shutdown_notify_back.notify_waiters(); log::debug!("Dispatching shut down"); } else { - log::debug!("Dispatching stopped (listner returned `None`)"); + log::debug!("Dispatching stopped (listener returned `None`)"); } self.shutdown_state.store(IsntRunning); diff --git a/src/dispatching/mod.rs b/src/dispatching/mod.rs index 651cae53..ee2adee5 100644 --- a/src/dispatching/mod.rs +++ b/src/dispatching/mod.rs @@ -46,11 +46,14 @@ //! [examples/dialogue_bot]: https://github.com/teloxide/teloxide/tree/master/examples/dialogue_bot pub mod dialogue; +pub mod stop_token; +pub mod update_listeners; + +pub(crate) mod repls; + mod dispatcher; mod dispatcher_handler; mod dispatcher_handler_rx_ext; -pub(crate) mod repls; -pub mod update_listeners; mod update_with_cx; pub use dispatcher::Dispatcher; diff --git a/src/dispatching/stop_token.rs b/src/dispatching/stop_token.rs new file mode 100644 index 00000000..faea1ed2 --- /dev/null +++ b/src/dispatching/stop_token.rs @@ -0,0 +1,74 @@ +use std::{future::Future, pin::Pin, task}; + +use futures::future::{pending, AbortHandle, Abortable, Pending}; + +/// A stop token allows you to stop listener. +/// +/// See also: [`UpdateListener::stop_token`]. +/// +/// [`UpdateListener::stop_token`]: +/// crate::dispatching::update_listeners::UpdateListener::stop_token +pub trait StopToken { + /// Stop the listener linked to this token. + fn stop(self); +} + +/// A stop token which does nothing. May be used in prototyping or in cases +/// where you do not care about graceful shutdowning. +pub struct Noop; + +impl StopToken for Noop { + fn stop(self) {} +} + +/// A stop token which corresponds to [`AsyncStopFlag`]. +#[derive(Clone)] +pub struct AsyncStopToken(AbortHandle); + +/// A flag which corresponds to [`AsyncStopToken`]. +/// +/// To know if stop token was used you can either repeatedly call [`is_stopped`] +/// or use this type as a `Future`. +/// +/// [`is_stopped`]: AsyncStopFlag::is_stopped +#[pin_project::pin_project] +pub struct AsyncStopFlag(#[pin] Abortable>); + +impl AsyncStopToken { + /// Create a new token/flag pair. + pub fn new_pair() -> (Self, AsyncStopFlag) { + let (handle, reg) = AbortHandle::new_pair(); + let token = Self(handle); + let flag = AsyncStopFlag(Abortable::new(pending(), reg)); + + (token, flag) + } +} + +impl StopToken for AsyncStopToken { + fn stop(self) { + self.0.abort() + } +} + +impl AsyncStopFlag { + /// Returns true if stop token linked to `self` was used. + pub fn is_stopped(&self) -> bool { + self.0.is_aborted() + } +} + +/// This future resolves when a stop token was used. +impl Future for AsyncStopFlag { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { + self.project().0.poll(cx).map(|res| { + debug_assert!( + res.is_err(), + "Pending Future can't ever be resolved, so Abortable is only resolved when \ + canceled" + ); + }) + } +} diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index 0e56edd7..d74d36cf 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -103,15 +103,20 @@ //! [short]: https://en.wikipedia.org/wiki/Polling_(computer_science) //! [webhook]: https://en.wikipedia.org/wiki/Webhook -use futures::{stream, Stream, StreamExt}; +use futures::{ + future::{ready, Either}, + stream, Stream, StreamExt, +}; -use std::{convert::TryInto, iter, time::Duration, vec}; +use std::{convert::TryInto, time::Duration}; use teloxide_core::{ payloads::GetUpdates, requests::{HasPayload, Request, Requester}, types::{AllowedUpdate, SemiparsedVec, Update}, }; +use crate::dispatching::stop_token::{AsyncStopFlag, AsyncStopToken, StopToken}; + /// An update listener. /// /// Implementors of this trait allow getting updates from Telegram. @@ -130,20 +135,33 @@ use teloxide_core::{ /// [`Stream`]: AsUpdateStream::Stream /// [`as_stream`]: AsUpdateStream::as_stream pub trait UpdateListener: for<'a> AsUpdateStream<'a, E> { - /// Stop listening for updates. + /// Type of token which allows ti stop this listener. + type StopToken: StopToken; + + /// Returns a token which stops this listener. /// - /// This function is not guaranteed to have an immediate effect. That is - /// some listeners can return updates even after [`stop`] is called (e.g.: - /// because of buffering). + /// The [`stop`] function of the token is not guaranteed to have an + /// immediate effect. That is some listeners can return updates even + /// after [`stop`] is called (e.g.: because of buffering). /// - /// [`stop`]: UpdateListener::stop + /// [`stop`]: StopToken::stop /// /// Implementors of this function are encouraged to stop listening for /// updates as soon as possible and return `None` from the update stream as /// soon as all cached updates are returned. - fn stop(&mut self); + #[must_use = "This function doesn't stop listening, to stop listening you need to call stop on \ + the returned token"] + fn stop_token(&mut self) -> Self::StopToken; /// Timeout duration hint. + /// + /// This hints how often dispatcher should check for shutdown. E.g. for + /// [`polling`] this returns the [`timeout`]. + /// + /// [`timeout`]: crate::payloads::GetUpdates::timeout + /// + /// If you are implementing this trait and not sure what to return from this + /// function, just leave it with default implementation. fn timeout_hint(&self) -> Option { None } @@ -159,8 +177,6 @@ pub trait AsUpdateStream<'a, E> { /// Creates the update [`Stream`]. /// /// [`Stream`]: AsUpdateStream::Stream - /// - /// Returned stream **must not** be stateful. State must be kept in `self`. fn as_stream(&'a mut self) -> Self::Stream; } @@ -202,35 +218,14 @@ where R: Requester + 'static, ::GetUpdatesFaultTolerant: Send, { - enum RunningState { - Polling, - Stopping, - Stopped, - } - struct State { bot: B, timeout: Option, limit: Option, allowed_updates: Option>, offset: i32, - run_state: RunningState, - - // Updates fetched last time. - // - // We need to store them here so we can drop stream without loosing state. - #[allow(clippy::type_complexity)] - fetched: Option< - iter::Map< - iter::FilterMap< - vec::IntoIter>, - fn( - Result, - ) -> std::option::Option, - >, - fn(Update) -> Result, - >, - >, + flag: AsyncStopFlag, + token: AsyncStopToken, } fn stream(st: &mut State) -> impl Stream> + '_ @@ -238,105 +233,87 @@ where B: Requester, { stream::unfold(st, move |state| async move { - let State { timeout, limit, allowed_updates, bot, offset, run_state, fetched, .. } = - &mut *state; - - let fetched_is_none_or_empty = fetched - .as_ref() - .map(|f| matches!(f.size_hint(), (_lower, Some(0)))) - .unwrap_or(true); - - if fetched_is_none_or_empty { - match run_state { - RunningState::Polling => {} - RunningState::Stopped => return None, - RunningState::Stopping => { - let mut req = bot.get_updates_fault_tolerant(); - - req.payload_mut().0 = GetUpdates { - offset: Some(*offset), - timeout: *timeout, - limit: Some(1), - allowed_updates: allowed_updates.take(), - }; - - return match req.send().await { - Ok(_) => { - *run_state = RunningState::Stopped; - None - } - Err(err) => Some((stream::iter(Some(Err(err))), state)), - }; - } - } + let State { timeout, limit, allowed_updates, bot, offset, flag, .. } = &mut *state; + if flag.is_stopped() { let mut req = bot.get_updates_fault_tolerant(); + req.payload_mut().0 = GetUpdates { offset: Some(*offset), - timeout: *timeout, - limit: *limit, + timeout: Some(0), + limit: Some(1), allowed_updates: allowed_updates.take(), }; - let updates = match req.send().await { - Err(err) => return Some((stream::iter(Some(Err(err))), state)), - Ok(SemiparsedVec(updates)) => { - // Set offset to the last update's id + 1 - if let Some(upd) = updates.last() { - let id: i32 = match upd { - Ok(ok) => ok.id, - Err((value, _)) => value["update_id"] - .as_i64() - .expect("The 'update_id' field must always exist in Update") - .try_into() - .expect("update_id must be i32"), - }; + return match req.send().await { + Ok(_) => None, + Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)), + }; + } - *offset = id + 1; - } + let mut req = bot.get_updates_fault_tolerant(); + req.payload_mut().0 = GetUpdates { + offset: Some(*offset), + timeout: *timeout, + limit: *limit, + allowed_updates: allowed_updates.take(), + }; - for update in &updates { - if let Err((value, e)) = update { - log::error!( + let updates = match req.send().await { + Err(err) => return Some((Either::Left(stream::once(ready(Err(err)))), state)), + Ok(SemiparsedVec(updates)) => { + // Set offset to the last update's id + 1 + if let Some(upd) = updates.last() { + let id: i32 = match upd { + Ok(ok) => ok.id, + Err((value, _)) => value["update_id"] + .as_i64() + .expect("The 'update_id' field must always exist in Update") + .try_into() + .expect("update_id must be i32"), + }; + + *offset = id + 1; + } + + for update in &updates { + if let Err((value, e)) = update { + log::error!( "Cannot parse an update.\nError: {:?}\nValue: {}\n\ This is a bug in teloxide-core, please open an issue here: \ https://github.com/teloxide/teloxide-core/issues.", e, value ); - } } - - updates - .into_iter() - .filter_map(Result::ok as fn(_) -> _) - .map(Ok as fn(_) -> _) } - }; - *fetched = Some(updates); - } + updates.into_iter().filter_map(Result::ok).map(Ok) + } + }; - Some((stream::iter(fetched.as_mut().and_then(|f| f.next())), state)) + Some((Either::Right(stream::iter(updates)), state)) }) .flatten() } + let (token, flag) = AsyncStopToken::new_pair(); + let state = State { bot: requester, timeout: timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")), limit, allowed_updates, offset: 0, - run_state: RunningState::Polling, - fetched: None, + flag, + token, }; - let stop = Some(|st: &mut State<_>| st.run_state = RunningState::Stopping); + let stop = |st: &mut State<_>| st.token.clone(); let timeout_hint = Some(move |_: &State<_>| timeout); - StatefulListner { state, stream, stop, timeout_hint } + StatefulListener { state, stream, stop, timeout_hint } } async fn delete_webhook_if_setup(requester: &R) @@ -360,9 +337,9 @@ where } } -/// A listner created from `state` and `stream`/`stop` functions. -struct StatefulListner { - /// The state of the listner. +/// A listener created from `state` and `stream`/`stop` functions. +struct StatefulListener { + /// The state of the listener. state: St, /// Function used as `AsUpdateStream::as_stream`. @@ -371,19 +348,19 @@ struct StatefulListner { /// `&mut`. stream: Assf, - /// Function used as `UpdateListner::stop`. + /// Function used as `UpdateListener::stop`. /// - /// Must be of type `for<'a> &'a mut St`. - stop: Option, + /// Must be of type `for<'a> &'a mut St -> impl StopToken`. + stop: Sf, - /// Function used as `UpdateListner::timeout_hint`. + /// Function used as `UpdateListener::timeout_hint`. /// /// Must be of type `for<'a> &'a St -> Option` and callable by /// `&`. timeout_hint: Option, } -impl<'a, St, Assf, Sf, Thf, Strm, E> AsUpdateStream<'a, E> for StatefulListner +impl<'a, St, Assf, Sf, Thf, Strm, E> AsUpdateStream<'a, E> for StatefulListener where (St, Strm): 'a, Assf: FnMut(&'a mut St) -> Strm, @@ -396,16 +373,17 @@ where } } -impl UpdateListener for StatefulListner +impl UpdateListener for StatefulListener where Self: for<'a> AsUpdateStream<'a, E>, - Sf: FnOnce(&mut St), + Sf: FnMut(&mut St) -> Stt, + Stt: StopToken, Thf: Fn(&St) -> Option, { - fn stop(&mut self) { - if let Some(stop) = self.stop.take() { - stop(&mut self.state) - } + type StopToken = Stt; + + fn stop_token(&mut self) -> Stt { + (self.stop)(&mut self.state) } fn timeout_hint(&self) -> Option { From 881aa3d6b6c2e18d0f1a6793a737f8107c74d5c9 Mon Sep 17 00:00:00 2001 From: Waffle Date: Sat, 22 May 2021 00:08:03 +0300 Subject: [PATCH 12/28] Split `update_listeners` into multiple modules --- src/dispatching/update_listeners.rs | 236 +----------------- src/dispatching/update_listeners/polling.rs | 173 +++++++++++++ .../update_listeners/stateful_listener.rs | 63 +++++ 3 files changed, 247 insertions(+), 225 deletions(-) create mode 100644 src/dispatching/update_listeners/polling.rs create mode 100644 src/dispatching/update_listeners/stateful_listener.rs diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index d74d36cf..1d9652bb 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -96,33 +96,30 @@ //! //! [`UpdateListener`]: UpdateListener //! [`polling_default`]: polling_default -//! [`polling`]: polling +//! [`polling`]: polling() //! [`Box::get_updates`]: crate::requests::Requester::get_updates //! [getting updates]: https://core.telegram.org/bots/api#getting-updates //! [long]: https://en.wikipedia.org/wiki/Push_technology#Long_polling //! [short]: https://en.wikipedia.org/wiki/Polling_(computer_science) //! [webhook]: https://en.wikipedia.org/wiki/Webhook -use futures::{ - future::{ready, Either}, - stream, Stream, StreamExt, -}; +use futures::Stream; -use std::{convert::TryInto, time::Duration}; -use teloxide_core::{ - payloads::GetUpdates, - requests::{HasPayload, Request, Requester}, - types::{AllowedUpdate, SemiparsedVec, Update}, -}; +use std::time::Duration; -use crate::dispatching::stop_token::{AsyncStopFlag, AsyncStopToken, StopToken}; +use crate::{dispatching::stop_token::StopToken, types::Update}; + +mod polling; +mod stateful_listener; + +pub use self::polling::{polling, polling_default}; /// An update listener. /// /// Implementors of this trait allow getting updates from Telegram. /// /// Currently Telegram has 2 ways of getting updates -- [polling] and -/// [webhooks]. Currently, only the former one is implemented (see [`polling`] +/// [webhooks]. Currently, only the former one is implemented (see [`polling()`] /// and [`polling_default`]) /// /// Some functions of this trait are located in the supertrait @@ -156,7 +153,7 @@ pub trait UpdateListener: for<'a> AsUpdateStream<'a, E> { /// Timeout duration hint. /// /// This hints how often dispatcher should check for shutdown. E.g. for - /// [`polling`] this returns the [`timeout`]. + /// [`polling()`] this returns the [`timeout`]. /// /// [`timeout`]: crate::payloads::GetUpdates::timeout /// @@ -179,214 +176,3 @@ pub trait AsUpdateStream<'a, E> { /// [`Stream`]: AsUpdateStream::Stream fn as_stream(&'a mut self) -> Self::Stream; } - -/// Returns a long polling update listener with `timeout` of 10 seconds. -/// -/// See also: [`polling`](polling). -/// -/// ## Notes -/// -/// This function will automatically delete a webhook if it was set up. -pub async fn polling_default(requester: R) -> impl UpdateListener -where - R: Requester + 'static, - ::GetUpdatesFaultTolerant: Send, -{ - delete_webhook_if_setup(&requester).await; - polling(requester, Some(Duration::from_secs(10)), None, None) -} - -/// Returns a long/short polling update listener with some additional options. -/// -/// - `bot`: Using this bot, the returned update listener will receive updates. -/// - `timeout`: A timeout for polling. -/// - `limit`: Limits the number of updates to be retrieved at once. Values -/// between 1—100 are accepted. -/// - `allowed_updates`: A list the types of updates you want to receive. -/// See [`GetUpdates`] for defaults. -/// -/// See also: [`polling_default`](polling_default). -/// -/// [`GetUpdates`]: crate::payloads::GetUpdates -pub fn polling( - requester: R, - timeout: Option, - limit: Option, - allowed_updates: Option>, -) -> impl UpdateListener -where - R: Requester + 'static, - ::GetUpdatesFaultTolerant: Send, -{ - struct State { - bot: B, - timeout: Option, - limit: Option, - allowed_updates: Option>, - offset: i32, - flag: AsyncStopFlag, - token: AsyncStopToken, - } - - fn stream(st: &mut State) -> impl Stream> + '_ - where - B: Requester, - { - stream::unfold(st, move |state| async move { - let State { timeout, limit, allowed_updates, bot, offset, flag, .. } = &mut *state; - - if flag.is_stopped() { - let mut req = bot.get_updates_fault_tolerant(); - - req.payload_mut().0 = GetUpdates { - offset: Some(*offset), - timeout: Some(0), - limit: Some(1), - allowed_updates: allowed_updates.take(), - }; - - return match req.send().await { - Ok(_) => None, - Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)), - }; - } - - let mut req = bot.get_updates_fault_tolerant(); - req.payload_mut().0 = GetUpdates { - offset: Some(*offset), - timeout: *timeout, - limit: *limit, - allowed_updates: allowed_updates.take(), - }; - - let updates = match req.send().await { - Err(err) => return Some((Either::Left(stream::once(ready(Err(err)))), state)), - Ok(SemiparsedVec(updates)) => { - // Set offset to the last update's id + 1 - if let Some(upd) = updates.last() { - let id: i32 = match upd { - Ok(ok) => ok.id, - Err((value, _)) => value["update_id"] - .as_i64() - .expect("The 'update_id' field must always exist in Update") - .try_into() - .expect("update_id must be i32"), - }; - - *offset = id + 1; - } - - for update in &updates { - if let Err((value, e)) = update { - log::error!( - "Cannot parse an update.\nError: {:?}\nValue: {}\n\ - This is a bug in teloxide-core, please open an issue here: \ - https://github.com/teloxide/teloxide-core/issues.", - e, - value - ); - } - } - - updates.into_iter().filter_map(Result::ok).map(Ok) - } - }; - - Some((Either::Right(stream::iter(updates)), state)) - }) - .flatten() - } - - let (token, flag) = AsyncStopToken::new_pair(); - - let state = State { - bot: requester, - timeout: timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")), - limit, - allowed_updates, - offset: 0, - flag, - token, - }; - - let stop = |st: &mut State<_>| st.token.clone(); - - let timeout_hint = Some(move |_: &State<_>| timeout); - - StatefulListener { state, stream, stop, timeout_hint } -} - -async fn delete_webhook_if_setup(requester: &R) -where - R: Requester, -{ - let webhook_info = match requester.get_webhook_info().send().await { - Ok(ok) => ok, - Err(e) => { - log::error!("Failed to get webhook info: {:?}", e); - return; - } - }; - - let is_webhook_setup = !webhook_info.url.is_empty(); - - if is_webhook_setup { - if let Err(e) = requester.delete_webhook().send().await { - log::error!("Failed to delete a webhook: {:?}", e); - } - } -} - -/// A listener created from `state` and `stream`/`stop` functions. -struct StatefulListener { - /// The state of the listener. - state: St, - - /// Function used as `AsUpdateStream::as_stream`. - /// - /// Must be of type `for<'a> &'a mut St -> impl Stream + 'a` and callable by - /// `&mut`. - stream: Assf, - - /// Function used as `UpdateListener::stop`. - /// - /// Must be of type `for<'a> &'a mut St -> impl StopToken`. - stop: Sf, - - /// Function used as `UpdateListener::timeout_hint`. - /// - /// Must be of type `for<'a> &'a St -> Option` and callable by - /// `&`. - timeout_hint: Option, -} - -impl<'a, St, Assf, Sf, Thf, Strm, E> AsUpdateStream<'a, E> for StatefulListener -where - (St, Strm): 'a, - Assf: FnMut(&'a mut St) -> Strm, - Strm: Stream>, -{ - type Stream = Strm; - - fn as_stream(&'a mut self) -> Self::Stream { - (self.stream)(&mut self.state) - } -} - -impl UpdateListener for StatefulListener -where - Self: for<'a> AsUpdateStream<'a, E>, - Sf: FnMut(&mut St) -> Stt, - Stt: StopToken, - Thf: Fn(&St) -> Option, -{ - type StopToken = Stt; - - fn stop_token(&mut self) -> Stt { - (self.stop)(&mut self.state) - } - - fn timeout_hint(&self) -> Option { - self.timeout_hint.as_ref().and_then(|f| f(&self.state)) - } -} diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs new file mode 100644 index 00000000..6d65dc45 --- /dev/null +++ b/src/dispatching/update_listeners/polling.rs @@ -0,0 +1,173 @@ +use std::{convert::TryInto, time::Duration}; + +use futures::{ + future::{ready, Either}, + stream::{self, Stream, StreamExt}, +}; + +use crate::{ + dispatching::{ + stop_token::{AsyncStopFlag, AsyncStopToken}, + update_listeners::{stateful_listener::StatefulListener, UpdateListener}, + }, + payloads::GetUpdates, + requests::{HasPayload, Request, Requester}, + types::{AllowedUpdate, SemiparsedVec, Update}, +}; + +/// Returns a long polling update listener with `timeout` of 10 seconds. +/// +/// See also: [`polling`](polling). +/// +/// ## Notes +/// +/// This function will automatically delete a webhook if it was set up. +pub async fn polling_default(requester: R) -> impl UpdateListener +where + R: Requester + 'static, + ::GetUpdatesFaultTolerant: Send, +{ + delete_webhook_if_setup(&requester).await; + polling(requester, Some(Duration::from_secs(10)), None, None) +} + +/// Returns a long/short polling update listener with some additional options. +/// +/// - `bot`: Using this bot, the returned update listener will receive updates. +/// - `timeout`: A timeout for polling. +/// - `limit`: Limits the number of updates to be retrieved at once. Values +/// between 1—100 are accepted. +/// - `allowed_updates`: A list the types of updates you want to receive. +/// See [`GetUpdates`] for defaults. +/// +/// See also: [`polling_default`](polling_default). +/// +/// [`GetUpdates`]: crate::payloads::GetUpdates +pub fn polling( + requester: R, + timeout: Option, + limit: Option, + allowed_updates: Option>, +) -> impl UpdateListener +where + R: Requester + 'static, + ::GetUpdatesFaultTolerant: Send, +{ + struct State { + bot: B, + timeout: Option, + limit: Option, + allowed_updates: Option>, + offset: i32, + flag: AsyncStopFlag, + token: AsyncStopToken, + } + + fn stream(st: &mut State) -> impl Stream> + '_ + where + B: Requester, + { + stream::unfold(st, move |state| async move { + let State { timeout, limit, allowed_updates, bot, offset, flag, .. } = &mut *state; + + if flag.is_stopped() { + let mut req = bot.get_updates_fault_tolerant(); + + req.payload_mut().0 = GetUpdates { + offset: Some(*offset), + timeout: Some(0), + limit: Some(1), + allowed_updates: allowed_updates.take(), + }; + + return match req.send().await { + Ok(_) => None, + Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)), + }; + } + + let mut req = bot.get_updates_fault_tolerant(); + req.payload_mut().0 = GetUpdates { + offset: Some(*offset), + timeout: *timeout, + limit: *limit, + allowed_updates: allowed_updates.take(), + }; + + let updates = match req.send().await { + Err(err) => return Some((Either::Left(stream::once(ready(Err(err)))), state)), + Ok(SemiparsedVec(updates)) => { + // Set offset to the last update's id + 1 + if let Some(upd) = updates.last() { + let id: i32 = match upd { + Ok(ok) => ok.id, + Err((value, _)) => value["update_id"] + .as_i64() + .expect("The 'update_id' field must always exist in Update") + .try_into() + .expect("update_id must be i32"), + }; + + *offset = id + 1; + } + + for update in &updates { + if let Err((value, e)) = update { + log::error!( + "Cannot parse an update.\nError: {:?}\nValue: {}\n\ + This is a bug in teloxide-core, please open an issue here: \ + https://github.com/teloxide/teloxide-core/issues.", + e, + value + ); + } + } + + updates.into_iter().filter_map(Result::ok).map(Ok) + } + }; + + Some((Either::Right(stream::iter(updates)), state)) + }) + .flatten() + } + + let (token, flag) = AsyncStopToken::new_pair(); + + let state = State { + bot: requester, + timeout: timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")), + limit, + allowed_updates, + offset: 0, + flag, + token, + }; + + let stop = |st: &mut State<_>| st.token.clone(); + + let timeout_hint = Some(move |_: &State<_>| timeout); + + StatefulListener { state, stream, stop, timeout_hint } +} + +async fn delete_webhook_if_setup(requester: &R) +where + R: Requester, +{ + let webhook_info = match requester.get_webhook_info().send().await { + Ok(ok) => ok, + Err(e) => { + log::error!("Failed to get webhook info: {:?}", e); + return; + } + }; + + let is_webhook_setup = !webhook_info.url.is_empty(); + + if is_webhook_setup { + if let Err(e) = requester.delete_webhook().send().await { + log::error!("Failed to delete a webhook: {:?}", e); + } + } +} diff --git a/src/dispatching/update_listeners/stateful_listener.rs b/src/dispatching/update_listeners/stateful_listener.rs new file mode 100644 index 00000000..8d6b2913 --- /dev/null +++ b/src/dispatching/update_listeners/stateful_listener.rs @@ -0,0 +1,63 @@ +use std::time::Duration; + +use futures::Stream; +use teloxide_core::types::Update; + +use crate::dispatching::{ + stop_token::StopToken, + update_listeners::{AsUpdateStream, UpdateListener}, +}; + +/// A listener created from `state` and `stream`/`stop` functions. +pub(crate) struct StatefulListener { + /// The state of the listener. + pub(crate) state: St, + + /// Function used as `AsUpdateStream::as_stream`. + /// + /// Must be of type `for<'a> &'a mut St -> impl Stream + 'a` and callable by + /// `&mut`. + pub(crate) stream: Assf, + + /// Function used as `UpdateListener::stop`. + /// + /// Must be of type `for<'a> &'a mut St -> impl StopToken`. + pub(crate) stop: Sf, + + /// Function used as `UpdateListener::timeout_hint`. + /// + /// Must be of type `for<'a> &'a St -> Option` and callable by + /// `&`. + pub(crate) timeout_hint: Option, +} + +impl<'a, St, Assf, Sf, Thf, Strm, E> AsUpdateStream<'a, E> for StatefulListener +where + (St, Strm): 'a, + Assf: FnMut(&'a mut St) -> Strm, + Strm: Stream>, +{ + type Stream = Strm; + + fn as_stream(&'a mut self) -> Self::Stream { + (self.stream)(&mut self.state) + } +} + +impl UpdateListener for StatefulListener +where + Self: for<'a> AsUpdateStream<'a, E>, + Sf: FnMut(&mut St) -> Stt, + Stt: StopToken, + Thf: Fn(&St) -> Option, +{ + type StopToken = Stt; + + fn stop_token(&mut self) -> Stt { + (self.stop)(&mut self.state) + } + + fn timeout_hint(&self) -> Option { + self.timeout_hint.as_ref().and_then(|f| f(&self.state)) + } +} From f58ae9b9ca82bc011a8e80a2efa0dd352b37a85b Mon Sep 17 00:00:00 2001 From: Waffle Date: Sat, 22 May 2021 00:37:16 +0300 Subject: [PATCH 13/28] Make StatefulListener pub --- src/dispatching/update_listeners.rs | 5 +- src/dispatching/update_listeners/polling.rs | 2 +- .../update_listeners/stateful_listener.rs | 77 ++++++++++++++++--- 3 files changed, 71 insertions(+), 13 deletions(-) diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index 1d9652bb..b8448023 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -112,7 +112,10 @@ use crate::{dispatching::stop_token::StopToken, types::Update}; mod polling; mod stateful_listener; -pub use self::polling::{polling, polling_default}; +pub use self::{ + polling::{polling, polling_default}, + stateful_listener::StatefulListener, +}; /// An update listener. /// diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index 6d65dc45..62cec6d6 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -148,7 +148,7 @@ where let timeout_hint = Some(move |_: &State<_>| timeout); - StatefulListener { state, stream, stop, timeout_hint } + StatefulListener { state, stream, stop_token: stop, timeout_hint } } async fn delete_webhook_if_setup(requester: &R) diff --git a/src/dispatching/update_listeners/stateful_listener.rs b/src/dispatching/update_listeners/stateful_listener.rs index 8d6b2913..9150daef 100644 --- a/src/dispatching/update_listeners/stateful_listener.rs +++ b/src/dispatching/update_listeners/stateful_listener.rs @@ -4,31 +4,79 @@ use futures::Stream; use teloxide_core::types::Update; use crate::dispatching::{ - stop_token::StopToken, + stop_token::{self, StopToken}, update_listeners::{AsUpdateStream, UpdateListener}, }; -/// A listener created from `state` and `stream`/`stop` functions. -pub(crate) struct StatefulListener { +/// A listener created from functions. +/// +/// This type allows to turn a stream of updates (+some additional functions) +/// into an [`UpdateListener`]. +/// +/// For an example of usage see [`polling`] +/// +/// [`polling`]: crate::dispatching::update_listeners::polling() +#[non_exhaustive] +pub struct StatefulListener { /// The state of the listener. - pub(crate) state: St, + pub state: St, - /// Function used as `AsUpdateStream::as_stream`. + /// Function used as [`AsUpdateStream::as_stream`]. /// /// Must be of type `for<'a> &'a mut St -> impl Stream + 'a` and callable by /// `&mut`. - pub(crate) stream: Assf, + pub stream: Assf, - /// Function used as `UpdateListener::stop`. + /// Function used as [`UpdateListener::stop_token`]. /// /// Must be of type `for<'a> &'a mut St -> impl StopToken`. - pub(crate) stop: Sf, + pub stop_token: Sf, - /// Function used as `UpdateListener::timeout_hint`. + /// Function used as [`UpdateListener::timeout_hint`]. /// /// Must be of type `for<'a> &'a St -> Option` and callable by /// `&`. - pub(crate) timeout_hint: Option, + pub timeout_hint: Option, +} + +impl StatefulListener { + /// Creates new stateful listener from it's components. + pub fn new(state: St, stream: Assf, stop_token: Sf, timeout_hint: Option) -> Self { + Self { state, stream, stop_token, timeout_hint } + } +} + +impl + StatefulListener< + S, + for<'a> fn(&'a mut S) -> &'a mut S, + for<'a> fn(&'a mut S) -> stop_token::Noop, + for<'a> fn(&'a S) -> Option, + > +where + S: Stream> + Unpin + 'static, +{ + /// Creates a new update listner from a stream of updates which ignore stop + /// signals. + /// + /// It won't be possible to ever stop this listener with stop token. + pub fn from_stream_without_graceful_shutdown(stream: S) -> Self { + let this = Self { + state: stream, + stream: |s| s, + stop_token: |_| stop_token::Noop, + timeout_hint: Some(|_| { + // FIXME: replace this by just Duration::MAX once 1.53 releases + // be released + const NANOS_PER_SEC: u32 = 1_000_000_000; + let dmax = Duration::new(u64::MAX, NANOS_PER_SEC - 1); + + Some(dmax) + }), + }; + + assert_update_listener(this) + } } impl<'a, St, Assf, Sf, Thf, Strm, E> AsUpdateStream<'a, E> for StatefulListener @@ -54,10 +102,17 @@ where type StopToken = Stt; fn stop_token(&mut self) -> Stt { - (self.stop)(&mut self.state) + (self.stop_token)(&mut self.state) } fn timeout_hint(&self) -> Option { self.timeout_hint.as_ref().and_then(|f| f(&self.state)) } } + +fn assert_update_listener(l: L) -> L +where + L: UpdateListener, +{ + l +} From 8785b8263cb4caebf212e2a66a19f73e653eb060 Mon Sep 17 00:00:00 2001 From: Waffle Date: Sat, 22 May 2021 00:37:53 +0300 Subject: [PATCH 14/28] Fix overflow in `dispatch_with_listener` --- src/dispatching/dispatcher.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 362d4157..1cca7f09 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -287,8 +287,12 @@ where // FIXME: replace this by just Duration::ZERO once 1.53 will be released const DZERO: Duration = Duration::from_secs(0); - let shutdown_check_timeout = - update_listener.timeout_hint().unwrap_or(DZERO) + MIN_SHUTDOWN_CHECK_TIMEOUT; + let shutdown_check_timeout = update_listener.timeout_hint().unwrap_or(DZERO); + + // FIXME: replace this by just saturating_add once 1.53 will be released + let shutdown_check_timeout = shutdown_check_timeout + .checked_add(MIN_SHUTDOWN_CHECK_TIMEOUT) + .unwrap_or(shutdown_check_timeout); let mut stop_token = Some(update_listener.stop_token()); From 76dee997e0ee15321602cc99e77635cd6ea8e747 Mon Sep 17 00:00:00 2001 From: Waffle Date: Sat, 22 May 2021 01:15:32 +0300 Subject: [PATCH 15/28] Fix webhook examples (support graceful shutdown) --- examples/heroku_ping_pong_bot/src/main.rs | 24 +++++++++++++++------ examples/ngrok_ping_pong_bot/src/main.rs | 20 +++++++++++------ src/dispatching/update_listeners/polling.rs | 4 ++-- 3 files changed, 33 insertions(+), 15 deletions(-) diff --git a/examples/heroku_ping_pong_bot/src/main.rs b/examples/heroku_ping_pong_bot/src/main.rs index 8f02734a..ddb7769f 100644 --- a/examples/heroku_ping_pong_bot/src/main.rs +++ b/examples/heroku_ping_pong_bot/src/main.rs @@ -1,7 +1,7 @@ // The version of Heroku ping-pong-bot, which uses a webhook to receive updates // from Telegram, instead of long polling. -use teloxide::{dispatching::update_listeners, prelude::*, types::Update}; +use teloxide::{dispatching::{update_listeners::{self, StatefulListener}, stop_token::AsyncStopToken}, prelude::*, types::Update}; use std::{convert::Infallible, env, net::SocketAddr}; use tokio::sync::mpsc; @@ -20,8 +20,8 @@ async fn handle_rejection(error: warp::Rejection) -> Result(bot: AutoSend) -> impl update_listeners::UpdateListener { - // Heroku defines auto defines a port value +pub async fn webhook(bot: AutoSend) -> impl update_listeners::UpdateListener { + // Heroku auto defines a port value let teloxide_token = env::var("TELOXIDE_TOKEN").expect("TELOXIDE_TOKEN env variable missing"); let port: u16 = env::var("PORT") .expect("PORT env variable missing") @@ -48,11 +48,21 @@ pub async fn webhook<'a>(bot: AutoSend) -> impl update_listeners::UpdateLis }) .recover(handle_rejection); - let serve = warp::serve(server); + let (stop_token, stop_flag) = AsyncStopToken::new_pair(); - let address = format!("0.0.0.0:{}", port); - tokio::spawn(serve.run(address.parse::().unwrap())); - UnboundedReceiverStream::new(rx) + let addr = format!("0.0.0.0:{}", port).parse::().unwrap(); + let server = warp::serve(server); + let (_addr, fut) = server.bind_with_graceful_shutdown(addr, stop_flag); + + // You might want to use serve.key_path/serve.cert_path methods here to + // setup a self-signed TLS certificate. + + tokio::spawn(fut); + let stream = UnboundedReceiverStream::new(rx); + + fn streamf(state: &mut (S, T)) -> &mut S { &mut state.0 } + + StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| state.1.clone(), None:: fn(&'a _) -> _>) } async fn run() { diff --git a/examples/ngrok_ping_pong_bot/src/main.rs b/examples/ngrok_ping_pong_bot/src/main.rs index a00c60ee..8863eb8f 100644 --- a/examples/ngrok_ping_pong_bot/src/main.rs +++ b/examples/ngrok_ping_pong_bot/src/main.rs @@ -1,7 +1,7 @@ // The version of ngrok ping-pong-bot, which uses a webhook to receive updates // from Telegram, instead of long polling. -use teloxide::{dispatching::update_listeners, prelude::*, types::Update}; +use teloxide::{dispatching::{update_listeners::{self, StatefulListener}, stop_token::AsyncStopToken}, prelude::*, types::Update}; use std::{convert::Infallible, net::SocketAddr}; use tokio::sync::mpsc; @@ -20,10 +20,10 @@ async fn handle_rejection(error: warp::Rejection) -> Result(bot: AutoSend) -> impl update_listeners::UpdateListener { +pub async fn webhook(bot: AutoSend) -> impl update_listeners::UpdateListener { // You might want to specify a self-signed certificate via .certificate // method on SetWebhook. - bot.set_webhook("Your HTTPS ngrok URL here. Get it by 'ngrok http 80'") + bot.set_webhook("Your HTTPS ngrok URL here. Get it by `ngrok http 80`") .await .expect("Cannot setup a webhook"); @@ -40,13 +40,21 @@ pub async fn webhook<'a>(bot: AutoSend) -> impl update_listeners::UpdateLis }) .recover(handle_rejection); - let serve = warp::serve(server); + let (stop_token, stop_flag) = AsyncStopToken::new_pair(); + + let addr = "127.0.0.1:80".parse::().unwrap(); + let server = warp::serve(server); + let (_addr, fut) = server.bind_with_graceful_shutdown(addr, stop_flag); // You might want to use serve.key_path/serve.cert_path methods here to // setup a self-signed TLS certificate. - tokio::spawn(serve.run("127.0.0.1:80".parse::().unwrap())); - UnboundedReceiverStream::new(rx) + tokio::spawn(fut); + let stream = UnboundedReceiverStream::new(rx); + + fn streamf(state: &mut (S, T)) -> &mut S { &mut state.0 } + + StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| state.1.clone(), None:: fn(&'a _) -> _>) } async fn run() { diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index 62cec6d6..50f1626c 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -144,11 +144,11 @@ where token, }; - let stop = |st: &mut State<_>| st.token.clone(); + let stop_token = |st: &mut State<_>| st.token.clone(); let timeout_hint = Some(move |_: &State<_>| timeout); - StatefulListener { state, stream, stop_token: stop, timeout_hint } + StatefulListener { state, stream, stop_token, timeout_hint } } async fn delete_webhook_if_setup(requester: &R) From c378d6ef4e524da96718beec6f989e8ac51d1531 Mon Sep 17 00:00:00 2001 From: Waffle Date: Fri, 25 Jun 2021 17:24:31 +0300 Subject: [PATCH 16/28] Make `StatefulListener::new` a little more convinient Remove `timeout_hint` from the `StatefulListener::new` function. This parameter is confusing and is likely to be set to `None` in most cases. Add a `StatefulListener::new_with_timeout_hint` parameter which is the same as the old `StatefulListener::new`. --- examples/heroku_ping_pong_bot/src/main.rs | 2 +- examples/ngrok_ping_pong_bot/src/main.rs | 2 +- .../update_listeners/stateful_listener.rs | 14 +++++++++++++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/examples/heroku_ping_pong_bot/src/main.rs b/examples/heroku_ping_pong_bot/src/main.rs index ddb7769f..2043ec49 100644 --- a/examples/heroku_ping_pong_bot/src/main.rs +++ b/examples/heroku_ping_pong_bot/src/main.rs @@ -62,7 +62,7 @@ pub async fn webhook(bot: AutoSend) -> impl update_listeners::UpdateListene fn streamf(state: &mut (S, T)) -> &mut S { &mut state.0 } - StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| state.1.clone(), None:: fn(&'a _) -> _>) + StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| state.1.clone()) } async fn run() { diff --git a/examples/ngrok_ping_pong_bot/src/main.rs b/examples/ngrok_ping_pong_bot/src/main.rs index 8863eb8f..8a35d5ad 100644 --- a/examples/ngrok_ping_pong_bot/src/main.rs +++ b/examples/ngrok_ping_pong_bot/src/main.rs @@ -54,7 +54,7 @@ pub async fn webhook(bot: AutoSend) -> impl update_listeners::UpdateListene fn streamf(state: &mut (S, T)) -> &mut S { &mut state.0 } - StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| state.1.clone(), None:: fn(&'a _) -> _>) + StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| state.1.clone()) } async fn run() { diff --git a/src/dispatching/update_listeners/stateful_listener.rs b/src/dispatching/update_listeners/stateful_listener.rs index 9150daef..b7a243c6 100644 --- a/src/dispatching/update_listeners/stateful_listener.rs +++ b/src/dispatching/update_listeners/stateful_listener.rs @@ -39,9 +39,21 @@ pub struct StatefulListener { pub timeout_hint: Option, } +impl StatefulListener fn(&'a St) -> Option> { + /// Creates new stateful listener from it's components. + pub fn new(state: St, stream: Assf, stop_token: Sf) -> Self { + Self { state, stream, stop_token, timeout_hint: None } + } +} + impl StatefulListener { /// Creates new stateful listener from it's components. - pub fn new(state: St, stream: Assf, stop_token: Sf, timeout_hint: Option) -> Self { + pub fn new_with_timeout_hint( + state: St, + stream: Assf, + stop_token: Sf, + timeout_hint: Option, + ) -> Self { Self { state, stream, stop_token, timeout_hint } } } From 569ef222fcc37b1b1d29ead76b9964dde20cf4e4 Mon Sep 17 00:00:00 2001 From: Waffle Date: Fri, 25 Jun 2021 17:41:02 +0300 Subject: [PATCH 17/28] Add dispatcher shutdown token This commit adds `ShutdownToken` which can be obrained throught `Dispatcher::shutdown_token` and then later be used to shutdown dispatching. --- src/dispatching/dispatcher.rs | 83 +++++++++++++++++++++++------------ 1 file changed, 54 insertions(+), 29 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 1cca7f09..069b016e 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -30,22 +30,6 @@ use tokio::{ type Tx = Option>>; -fn send<'a, R, Upd>(requester: &'a R, tx: &'a Tx, update: Upd, variant: &'static str) -where - Upd: Debug, - R: Requester + Clone, -{ - if let Some(tx) = tx { - if let Err(error) = tx.send(UpdateWithCx { requester: requester.clone(), update }) { - log::error!( - "The RX part of the {} channel is closed, but an update is received.\nError:{}\n", - variant, - error - ); - } - } -} - /// One dispatcher to rule them all. /// /// See the [module-level documentation](crate::dispatching) for the design @@ -68,7 +52,7 @@ pub struct Dispatcher { chat_members_queue: Tx, shutdown_state: Arc, - shutdown_notify_back: Notify, + shutdown_notify_back: Arc, } impl Dispatcher @@ -96,7 +80,7 @@ where shutdown_state: Arc::new(AtomicShutdownState { inner: AtomicU8::new(ShutdownState::IsntRunning as _), }), - shutdown_notify_back: Notify::new(), + shutdown_notify_back: Arc::new(Notify::new()), } } @@ -128,7 +112,7 @@ where tokio::signal::ctrl_c().await.expect("Failed to listen for ^C"); // If dispatcher wasn't running, then there is nothing to do - Self::shutdown_inner(&shutdown_state).ok(); + shutdown_inner(&shutdown_state).ok(); } }); @@ -340,19 +324,15 @@ where /// /// If you don't need to wait for shutdown, returned future can be ignored. pub fn shutdown(&self) -> Result + '_, ShutdownError> { - Self::shutdown_inner(&self.shutdown_state) + shutdown_inner(&self.shutdown_state) .map(|()| async move { self.shutdown_notify_back.notified().await }) } - fn shutdown_inner(shutdown_state: &AtomicShutdownState) -> Result<(), ShutdownError> { - use ShutdownState::*; - - let res = shutdown_state.compare_exchange(Running, ShuttingDown); - - match res { - Ok(_) | Err(ShuttingDown) => Ok(()), - Err(IsntRunning) => Err(ShutdownError::IsntRunning), - Err(Running) => unreachable!(), + /// Returns shutdown token, which can later be used to shutdown dispatching. + pub fn shutdown_token(&self) -> ShutdownToken { + ShutdownToken { + shutdown_state: Arc::clone(&self.shutdown_state), + shutdown_notify_back: Arc::clone(&self.shutdown_notify_back), } } @@ -454,6 +434,23 @@ where } } +pub struct ShutdownToken { + shutdown_state: Arc, + shutdown_notify_back: Arc, +} + +impl ShutdownToken { + /// Tries to shutdown dispatching. + /// + /// Returns error if this dispather isn't dispatching at the moment. + /// + /// If you don't need to wait for shutdown, returned future can be ignored. + pub fn shutdown(&self) -> Result + '_, ShutdownError> { + shutdown_inner(&self.shutdown_state) + .map(|()| async move { self.shutdown_notify_back.notified().await }) + } +} + #[derive(Debug)] pub enum ShutdownError { IsntRunning, @@ -505,3 +502,31 @@ impl ShutdownState { } } } + +fn shutdown_inner(shutdown_state: &AtomicShutdownState) -> Result<(), ShutdownError> { + use ShutdownState::*; + + let res = shutdown_state.compare_exchange(Running, ShuttingDown); + + match res { + Ok(_) | Err(ShuttingDown) => Ok(()), + Err(IsntRunning) => Err(ShutdownError::IsntRunning), + Err(Running) => unreachable!(), + } +} + +fn send<'a, R, Upd>(requester: &'a R, tx: &'a Tx, update: Upd, variant: &'static str) +where + Upd: Debug, + R: Requester + Clone, +{ + if let Some(tx) = tx { + if let Err(error) = tx.send(UpdateWithCx { requester: requester.clone(), update }) { + log::error!( + "The RX part of the {} channel is closed, but an update is received.\nError:{}\n", + variant, + error + ); + } + } +} From 7cfb207b447b34c98efa9556e73ad5ac9e82ad7b Mon Sep 17 00:00:00 2001 From: Waffle Date: Sat, 26 Jun 2021 14:33:10 +0300 Subject: [PATCH 18/28] Wait for handlers to finish before shutting down dispatcher This commit makes `Dispatcher::dispatch_with_listener` (and all it's derivatives like `Dispatcher::dispatch`, `teloxide::repl`) to wait for handlers to finish berfore shutting down. This commit also changes self-type for `Dispatcher::dispatch_with_listener` and `Dispatcher::dispatch` from `&Self` to `&mut Self` (this is required to wait for handlers to finish). Since unique reference is now required for listening,`Dispatcher::shutdown` function is removed. --- src/dispatching/dispatcher.rs | 60 ++++++++++++++++++++++++++--------- 1 file changed, 45 insertions(+), 15 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 069b016e..0cbdcefe 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -7,7 +7,7 @@ use crate::{ error_handlers::{ErrorHandler, LoggingErrorHandler}, }; use core::panic; -use futures::{Future, StreamExt}; +use futures::{stream::FuturesUnordered, Future, StreamExt}; use std::{ fmt::Debug, sync::{ @@ -25,6 +25,7 @@ use teloxide_core::{ }; use tokio::{ sync::{mpsc, Notify}, + task::JoinHandle, time::timeout, }; @@ -51,6 +52,8 @@ pub struct Dispatcher { my_chat_members_queue: Tx, chat_members_queue: Tx, + running_handlers: FuturesUnordered>, + shutdown_state: Arc, shutdown_notify_back: Arc, } @@ -77,6 +80,7 @@ where poll_answers_queue: None, my_chat_members_queue: None, chat_members_queue: None, + running_handlers: FuturesUnordered::new(), shutdown_state: Arc::new(AtomicShutdownState { inner: AtomicU8::new(ShutdownState::IsntRunning as _), }), @@ -86,17 +90,20 @@ where #[must_use] #[allow(clippy::unnecessary_wraps)] - fn new_tx(&self, h: H) -> Tx + fn new_tx(&mut self, h: H) -> Tx where H: DispatcherHandler + Send + 'static, Upd: Send + 'static, R: Send + 'static, { let (tx, rx) = mpsc::unbounded_channel(); - tokio::spawn(async move { + let join_handle = tokio::spawn(async move { let fut = h.handle(rx); fut.await; }); + + self.running_handlers.push(join_handle); + Some(tx) } @@ -111,6 +118,8 @@ where loop { tokio::signal::ctrl_c().await.expect("Failed to listen for ^C"); + log::debug!("^C receieved, trying to shutdown dispatcher"); + // If dispatcher wasn't running, then there is nothing to do shutdown_inner(&shutdown_state).ok(); } @@ -240,7 +249,13 @@ where /// /// The default parameters are a long polling update listener and log all /// errors produced by this listener). - pub async fn dispatch(&self) + /// + /// Please note that after shutting down (either because of [`shutdown`], + /// [ctrlc signal], or `update_listener` returning `None`) + /// + /// [`shutdown`]; ShutdownToken::shutdown + /// [ctrlc signal]: Dispatcher::setup_ctrlc_handler + pub async fn dispatch(&mut self) where R: Requester + Clone, ::GetUpdatesFaultTolerant: Send, @@ -254,8 +269,15 @@ where /// Starts your bot with custom `update_listener` and /// `update_listener_error_handler`. + /// + /// Please note that after shutting down (either because of [`shutdown`], + /// [ctrlc signal], or `update_listener` returning `None`) all handlers will + /// be gone. As such, to restart listening you need to re-add handlers. + /// + /// [`shutdown`]; ShutdownToken::shutdown + /// [ctrlc signal]: Dispatcher::setup_ctrlc_handler pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>( - &'a self, + &'a mut self, mut update_listener: UListener, update_listener_error_handler: Arc, ) where @@ -305,6 +327,24 @@ where } } + // Drop all senders, so handlers can stop + self.messages_queue.take(); + self.edited_messages_queue.take(); + self.channel_posts_queue.take(); + self.edited_channel_posts_queue.take(); + self.inline_queries_queue.take(); + self.chosen_inline_results_queue.take(); + self.callback_queries_queue.take(); + self.shipping_queries_queue.take(); + self.pre_checkout_queries_queue.take(); + self.polls_queue.take(); + self.poll_answers_queue.take(); + self.my_chat_members_queue.take(); + self.chat_members_queue.take(); + + // Wait untill all handlers process all updates + self.running_handlers.by_ref().for_each(|_| async {}).await; + if let ShuttingDown = self.shutdown_state.load() { // Stopped because of a `shutdown` call. @@ -318,16 +358,6 @@ where self.shutdown_state.store(IsntRunning); } - /// Tries to shutdown dispatching. - /// - /// Returns error if this dispather isn't dispatching at the moment. - /// - /// If you don't need to wait for shutdown, returned future can be ignored. - pub fn shutdown(&self) -> Result + '_, ShutdownError> { - shutdown_inner(&self.shutdown_state) - .map(|()| async move { self.shutdown_notify_back.notified().await }) - } - /// Returns shutdown token, which can later be used to shutdown dispatching. pub fn shutdown_token(&self) -> ShutdownToken { ShutdownToken { From 8e3ef4ab67e796e7f1f196377a38e558b342592d Mon Sep 17 00:00:00 2001 From: Waffle Date: Sat, 26 Jun 2021 15:08:32 +0300 Subject: [PATCH 19/28] Minor `Dispatcher` cleanup --- src/dispatching/dispatcher.rs | 128 +++++++++++++++++++--------------- 1 file changed, 72 insertions(+), 56 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 0cbdcefe..a26e55c2 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -1,13 +1,3 @@ -use crate::{ - dispatching::{ - stop_token::StopToken, - update_listeners::{self, UpdateListener}, - DispatcherHandler, UpdateWithCx, - }, - error_handlers::{ErrorHandler, LoggingErrorHandler}, -}; -use core::panic; -use futures::{stream::FuturesUnordered, Future, StreamExt}; use std::{ fmt::Debug, sync::{ @@ -16,6 +6,17 @@ use std::{ }, time::Duration, }; + +use crate::{ + dispatching::{ + stop_token::StopToken, + update_listeners::{self, UpdateListener}, + DispatcherHandler, UpdateWithCx, + }, + error_handlers::{ErrorHandler, LoggingErrorHandler}, +}; + +use futures::{stream::FuturesUnordered, Future, StreamExt}; use teloxide_core::{ requests::Requester, types::{ @@ -81,15 +82,12 @@ where my_chat_members_queue: None, chat_members_queue: None, running_handlers: FuturesUnordered::new(), - shutdown_state: Arc::new(AtomicShutdownState { - inner: AtomicU8::new(ShutdownState::IsntRunning as _), - }), - shutdown_notify_back: Arc::new(Notify::new()), + shutdown_state: <_>::default(), + shutdown_notify_back: <_>::default(), } } #[must_use] - #[allow(clippy::unnecessary_wraps)] fn new_tx(&mut self, h: H) -> Tx where H: DispatcherHandler + Send + 'static, @@ -97,10 +95,7 @@ where R: Send + 'static, { let (tx, rx) = mpsc::unbounded_channel(); - let join_handle = tokio::spawn(async move { - let fut = h.handle(rx); - fut.await; - }); + let join_handle = tokio::spawn(h.handle(rx)); self.running_handlers.push(join_handle); @@ -251,7 +246,8 @@ where /// errors produced by this listener). /// /// Please note that after shutting down (either because of [`shutdown`], - /// [ctrlc signal], or `update_listener` returning `None`) + /// [ctrlc signal], or `update_listener` returning `None`) all handlers will + /// be gone. As such, to restart listening you need to re-add handlers. /// /// [`shutdown`]; ShutdownToken::shutdown /// [ctrlc signal]: Dispatcher::setup_ctrlc_handler @@ -260,11 +256,11 @@ where R: Requester + Clone, ::GetUpdatesFaultTolerant: Send, { - self.dispatch_with_listener( - update_listeners::polling_default(self.requester.clone()).await, - LoggingErrorHandler::with_custom_text("An error from the update listener"), - ) - .await; + let listener = update_listeners::polling_default(self.requester.clone()).await; + let error_handler = + LoggingErrorHandler::with_custom_text("An error from the update listener"); + + self.dispatch_with_listener(listener, error_handler).await; } /// Starts your bot with custom `update_listener` and @@ -288,22 +284,14 @@ where { use ShutdownState::*; - const MIN_SHUTDOWN_CHECK_TIMEOUT: Duration = Duration::from_secs(1); - - // FIXME: replace this by just Duration::ZERO once 1.53 will be released - const DZERO: Duration = Duration::from_secs(0); - - let shutdown_check_timeout = update_listener.timeout_hint().unwrap_or(DZERO); - - // FIXME: replace this by just saturating_add once 1.53 will be released - let shutdown_check_timeout = shutdown_check_timeout - .checked_add(MIN_SHUTDOWN_CHECK_TIMEOUT) - .unwrap_or(shutdown_check_timeout); - + let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener); let mut stop_token = Some(update_listener.stop_token()); - if let Err(_) = self.shutdown_state.compare_exchange(IsntRunning, Running) { - panic!("Dispatching is already running"); + if let Err(actual) = self.shutdown_state.compare_exchange(IsntRunning, Running) { + unreachable!( + "Dispatching is already running: expected `IsntRunning` state, found `{:?}`", + actual + ); } { @@ -327,23 +315,7 @@ where } } - // Drop all senders, so handlers can stop - self.messages_queue.take(); - self.edited_messages_queue.take(); - self.channel_posts_queue.take(); - self.edited_channel_posts_queue.take(); - self.inline_queries_queue.take(); - self.chosen_inline_results_queue.take(); - self.callback_queries_queue.take(); - self.shipping_queries_queue.take(); - self.pre_checkout_queries_queue.take(); - self.polls_queue.take(); - self.poll_answers_queue.take(); - self.my_chat_members_queue.take(); - self.chat_members_queue.take(); - - // Wait untill all handlers process all updates - self.running_handlers.by_ref().for_each(|_| async {}).await; + self.wait_for_handlers().await; if let ShuttingDown = self.shutdown_state.load() { // Stopped because of a `shutdown` call. @@ -462,8 +434,32 @@ where } } } + + async fn wait_for_handlers(&mut self) { + log::debug!("Waiting for handlers to finish"); + + // Drop all senders, so handlers can stop + self.messages_queue.take(); + self.edited_messages_queue.take(); + self.channel_posts_queue.take(); + self.edited_channel_posts_queue.take(); + self.inline_queries_queue.take(); + self.chosen_inline_results_queue.take(); + self.callback_queries_queue.take(); + self.shipping_queries_queue.take(); + self.pre_checkout_queries_queue.take(); + self.polls_queue.take(); + self.poll_answers_queue.take(); + self.my_chat_members_queue.take(); + self.chat_members_queue.take(); + + // Wait untill all handlers finish + self.running_handlers.by_ref().for_each(|_| async {}).await; + } } +/// A token which can be used to shutdown dispatcher. +#[derive(Clone)] pub struct ShutdownToken { shutdown_state: Arc, shutdown_notify_back: Arc, @@ -481,6 +477,7 @@ impl ShutdownToken { } } +/// Error occured while trying to shutdown dispatcher. #[derive(Debug)] pub enum ShutdownError { IsntRunning, @@ -511,7 +508,14 @@ impl AtomicShutdownState { } } +impl Default for AtomicShutdownState { + fn default() -> Self { + Self { inner: AtomicU8::new(ShutdownState::IsntRunning as _) } + } +} + #[repr(u8)] +#[derive(Debug)] enum ShutdownState { Running, ShuttingDown, @@ -533,6 +537,18 @@ impl ShutdownState { } } +fn shutdown_check_timeout_for(update_listener: &impl UpdateListener) -> Duration { + const MIN_SHUTDOWN_CHECK_TIMEOUT: Duration = Duration::from_secs(1); + + // FIXME: replace this by just Duration::ZERO once 1.53 will be released + const DZERO: Duration = Duration::from_secs(0); + + let shutdown_check_timeout = update_listener.timeout_hint().unwrap_or(DZERO); + + // FIXME: replace this by just saturating_add once 1.53 will be released + shutdown_check_timeout.checked_add(MIN_SHUTDOWN_CHECK_TIMEOUT).unwrap_or(shutdown_check_timeout) +} + fn shutdown_inner(shutdown_state: &AtomicShutdownState) -> Result<(), ShutdownError> { use ShutdownState::*; From 49848abd614937f64023e1fa597acc60c79a3826 Mon Sep 17 00:00:00 2001 From: Waffle Date: Sat, 26 Jun 2021 15:33:33 +0300 Subject: [PATCH 20/28] Doc fixes --- src/dispatching/dispatcher.rs | 6 +++--- src/dispatching/mod.rs | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index a26e55c2..820a0f17 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -104,7 +104,7 @@ where /// Setup `^C` handler which [`shutdown`]s dispatching. /// - /// [`shutdown`]: Dispatcher::shutdown + /// [`shutdown`]: ShutdownToken::shutdown #[cfg(feature = "ctrlc_handler")] #[cfg_attr(docsrs, doc(cfg(feature = "ctrlc_handler")))] pub fn setup_ctrlc_handler(self) -> Self { @@ -249,7 +249,7 @@ where /// [ctrlc signal], or `update_listener` returning `None`) all handlers will /// be gone. As such, to restart listening you need to re-add handlers. /// - /// [`shutdown`]; ShutdownToken::shutdown + /// [`shutdown`]: ShutdownToken::shutdown /// [ctrlc signal]: Dispatcher::setup_ctrlc_handler pub async fn dispatch(&mut self) where @@ -270,7 +270,7 @@ where /// [ctrlc signal], or `update_listener` returning `None`) all handlers will /// be gone. As such, to restart listening you need to re-add handlers. /// - /// [`shutdown`]; ShutdownToken::shutdown + /// [`shutdown`]: ShutdownToken::shutdown /// [ctrlc signal]: Dispatcher::setup_ctrlc_handler pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>( &'a mut self, diff --git a/src/dispatching/mod.rs b/src/dispatching/mod.rs index ee2adee5..1d68bd41 100644 --- a/src/dispatching/mod.rs +++ b/src/dispatching/mod.rs @@ -27,7 +27,7 @@ //! that: //! - You are able to supply [`DialogueDispatcher`] as a handler. //! - You are able to supply functions that accept -//! [`tokio::sync::mpsc::UnboundedReceiver`] and return `Future` //! as a handler. //! //! Since they implement [`DispatcherHandler`] too. @@ -56,7 +56,7 @@ mod dispatcher_handler; mod dispatcher_handler_rx_ext; mod update_with_cx; -pub use dispatcher::Dispatcher; +pub use dispatcher::{Dispatcher, ShutdownError, ShutdownToken}; pub use dispatcher_handler::DispatcherHandler; pub use dispatcher_handler_rx_ext::DispatcherHandlerRxExt; use tokio::sync::mpsc::UnboundedReceiver; From 1795cb22c2bb243100420c74eee17f9cd8672809 Mon Sep 17 00:00:00 2001 From: Waffle Date: Sat, 26 Jun 2021 17:19:56 +0300 Subject: [PATCH 21/28] setup ^C handler in repls --- Cargo.toml | 2 +- src/dispatching/repls/commands_repl.rs | 3 +++ src/dispatching/repls/dialogues_repl.rs | 3 +++ src/dispatching/repls/repl.rs | 3 +++ 4 files changed, 10 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index ec2eba2d..dd93a201 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ authors = [ maintenance = { status = "actively-developed" } [features] -default = ["native-tls", "teloxide-core/default"] +default = ["native-tls", "ctrlc_handler", "teloxide-core/default"] sqlite-storage = ["sqlx"] redis-storage = ["redis"] diff --git a/src/dispatching/repls/commands_repl.rs b/src/dispatching/repls/commands_repl.rs index 88a7c33d..a3625910 100644 --- a/src/dispatching/repls/commands_repl.rs +++ b/src/dispatching/repls/commands_repl.rs @@ -22,6 +22,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; /// /// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop /// [`Dispatcher`]: crate::dispatching::Dispatcher +#[cfg(feature = "ctrlc_handler")] pub async fn commands_repl(requester: R, bot_name: N, handler: H) where Cmd: BotCommand + Send + 'static, @@ -56,6 +57,7 @@ where /// [`Dispatcher`]: crate::dispatching::Dispatcher /// [`commands_repl`]: crate::dispatching::repls::commands_repl() /// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener +#[cfg(feature = "ctrlc_handler")] pub async fn commands_repl_with_listener<'a, R, Cmd, H, Fut, L, ListenerE, HandlerE, N>( requester: R, bot_name: N, @@ -87,6 +89,7 @@ pub async fn commands_repl_with_listener<'a, R, Cmd, H, Fut, L, ListenerE, Handl }, ) }) + .setup_ctrlc_handler() .dispatch_with_listener( listener, LoggingErrorHandler::with_custom_text("An error from the update listener"), diff --git a/src/dispatching/repls/dialogues_repl.rs b/src/dispatching/repls/dialogues_repl.rs index 0fa00188..755d4dd3 100644 --- a/src/dispatching/repls/dialogues_repl.rs +++ b/src/dispatching/repls/dialogues_repl.rs @@ -23,6 +23,7 @@ use teloxide_core::{requests::Requester, types::Message}; /// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop /// [`Dispatcher`]: crate::dispatching::Dispatcher /// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage +#[cfg(feature = "ctrlc_handler")] pub async fn dialogues_repl<'a, R, H, D, Fut>(requester: R, handler: H) where H: Fn(UpdateWithCx, D) -> Fut + Send + Sync + 'static, @@ -55,6 +56,7 @@ where /// [`dialogues_repl`]: crate::dispatching::repls::dialogues_repl() /// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener /// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage +#[cfg(feature = "ctrlc_handler")] pub async fn dialogues_repl_with_listener<'a, R, H, D, Fut, L, ListenerE>( requester: R, handler: H, @@ -85,6 +87,7 @@ pub async fn dialogues_repl_with_listener<'a, R, H, D, Fut, L, ListenerE>( } }, )) + .setup_ctrlc_handler() .dispatch_with_listener( listener, LoggingErrorHandler::with_custom_text("An error from the update listener"), diff --git a/src/dispatching/repls/repl.rs b/src/dispatching/repls/repl.rs index 3c498696..a298e768 100644 --- a/src/dispatching/repls/repl.rs +++ b/src/dispatching/repls/repl.rs @@ -21,6 +21,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; /// /// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop /// [`Dispatcher`]: crate::dispatching::Dispatcher +#[cfg(feature = "ctrlc_handler")] pub async fn repl(requester: R, handler: H) where H: Fn(UpdateWithCx) -> Fut + Send + Sync + 'static, @@ -51,6 +52,7 @@ where /// [`Dispatcher`]: crate::dispatching::Dispatcher /// [`repl`]: crate::dispatching::repls::repl() /// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener +#[cfg(feature = "ctrlc_handler")] pub async fn repl_with_listener<'a, R, H, Fut, E, L, ListenerE>( requester: R, handler: H, @@ -76,6 +78,7 @@ pub async fn repl_with_listener<'a, R, H, Fut, E, L, ListenerE>( } }) }) + .setup_ctrlc_handler() .dispatch_with_listener( listener, LoggingErrorHandler::with_custom_text("An error from the update listener"), From 483e544223b569057a78861fcb847d70ecccbbcd Mon Sep 17 00:00:00 2001 From: Waffle Date: Sat, 26 Jun 2021 23:04:36 +0300 Subject: [PATCH 22/28] Apply suggestions from the review: rename AtomicShutdownState => DispatcherState, IsntRunning => Idle --- src/dispatching/dispatcher.rs | 49 ++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 820a0f17..6a94258b 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -55,7 +55,7 @@ pub struct Dispatcher { running_handlers: FuturesUnordered>, - shutdown_state: Arc, + state: Arc, shutdown_notify_back: Arc, } @@ -82,7 +82,7 @@ where my_chat_members_queue: None, chat_members_queue: None, running_handlers: FuturesUnordered::new(), - shutdown_state: <_>::default(), + state: <_>::default(), shutdown_notify_back: <_>::default(), } } @@ -108,7 +108,7 @@ where #[cfg(feature = "ctrlc_handler")] #[cfg_attr(docsrs, doc(cfg(feature = "ctrlc_handler")))] pub fn setup_ctrlc_handler(self) -> Self { - let shutdown_state = Arc::clone(&self.shutdown_state); + let state = Arc::clone(&self.state); tokio::spawn(async move { loop { tokio::signal::ctrl_c().await.expect("Failed to listen for ^C"); @@ -116,7 +116,7 @@ where log::debug!("^C receieved, trying to shutdown dispatcher"); // If dispatcher wasn't running, then there is nothing to do - shutdown_inner(&shutdown_state).ok(); + shutdown_inner(&state).ok(); } }); @@ -287,10 +287,10 @@ where let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener); let mut stop_token = Some(update_listener.stop_token()); - if let Err(actual) = self.shutdown_state.compare_exchange(IsntRunning, Running) { + if let Err(actual) = self.state.compare_exchange(Idle, Running) { unreachable!( - "Dispatching is already running: expected `IsntRunning` state, found `{:?}`", - actual + "Dispatching is already running: expected `{:?}` state, found `{:?}`", + Idle, actual ); } @@ -306,7 +306,7 @@ where } } - if let ShuttingDown = self.shutdown_state.load() { + if let ShuttingDown = self.state.load() { if let Some(token) = stop_token.take() { log::debug!("Start shutting down dispatching"); token.stop(); @@ -317,7 +317,7 @@ where self.wait_for_handlers().await; - if let ShuttingDown = self.shutdown_state.load() { + if let ShuttingDown = self.state.load() { // Stopped because of a `shutdown` call. // Notify `shutdown`s that we finished @@ -327,13 +327,13 @@ where log::debug!("Dispatching stopped (listener returned `None`)"); } - self.shutdown_state.store(IsntRunning); + self.state.store(Idle); } /// Returns shutdown token, which can later be used to shutdown dispatching. pub fn shutdown_token(&self) -> ShutdownToken { ShutdownToken { - shutdown_state: Arc::clone(&self.shutdown_state), + dispatcher_state: Arc::clone(&self.state), shutdown_notify_back: Arc::clone(&self.shutdown_notify_back), } } @@ -461,7 +461,7 @@ where /// A token which can be used to shutdown dispatcher. #[derive(Clone)] pub struct ShutdownToken { - shutdown_state: Arc, + dispatcher_state: Arc, shutdown_notify_back: Arc, } @@ -472,7 +472,7 @@ impl ShutdownToken { /// /// If you don't need to wait for shutdown, returned future can be ignored. pub fn shutdown(&self) -> Result + '_, ShutdownError> { - shutdown_inner(&self.shutdown_state) + shutdown_inner(&self.dispatcher_state) .map(|()| async move { self.shutdown_notify_back.notified().await }) } } @@ -480,14 +480,15 @@ impl ShutdownToken { /// Error occured while trying to shutdown dispatcher. #[derive(Debug)] pub enum ShutdownError { - IsntRunning, + /// Couldn"t stop dispatcher since it wasn't running. + Idle, } -struct AtomicShutdownState { +struct DispatcherState { inner: AtomicU8, } -impl AtomicShutdownState { +impl DispatcherState { fn load(&self) -> ShutdownState { ShutdownState::from_u8(self.inner.load(Ordering::SeqCst)) } @@ -508,9 +509,9 @@ impl AtomicShutdownState { } } -impl Default for AtomicShutdownState { +impl Default for DispatcherState { fn default() -> Self { - Self { inner: AtomicU8::new(ShutdownState::IsntRunning as _) } + Self { inner: AtomicU8::new(ShutdownState::Idle as _) } } } @@ -519,19 +520,19 @@ impl Default for AtomicShutdownState { enum ShutdownState { Running, ShuttingDown, - IsntRunning, + Idle, } impl ShutdownState { fn from_u8(n: u8) -> Self { const RUNNING: u8 = ShutdownState::Running as u8; const SHUTTING_DOWN: u8 = ShutdownState::ShuttingDown as u8; - const ISNT_RUNNING: u8 = ShutdownState::IsntRunning as u8; + const IDLE: u8 = ShutdownState::Idle as u8; match n { RUNNING => ShutdownState::Running, SHUTTING_DOWN => ShutdownState::ShuttingDown, - ISNT_RUNNING => ShutdownState::IsntRunning, + IDLE => ShutdownState::Idle, _ => unreachable!(), } } @@ -549,14 +550,14 @@ fn shutdown_check_timeout_for(update_listener: &impl UpdateListener) -> Du shutdown_check_timeout.checked_add(MIN_SHUTDOWN_CHECK_TIMEOUT).unwrap_or(shutdown_check_timeout) } -fn shutdown_inner(shutdown_state: &AtomicShutdownState) -> Result<(), ShutdownError> { +fn shutdown_inner(state: &DispatcherState) -> Result<(), ShutdownError> { use ShutdownState::*; - let res = shutdown_state.compare_exchange(Running, ShuttingDown); + let res = state.compare_exchange(Running, ShuttingDown); match res { Ok(_) | Err(ShuttingDown) => Ok(()), - Err(IsntRunning) => Err(ShutdownError::IsntRunning), + Err(Idle) => Err(ShutdownError::Idle), Err(Running) => unreachable!(), } } From 0347f9e62744612b1f80a0a9ba81030c1143a6a0 Mon Sep 17 00:00:00 2001 From: Waffle Date: Sat, 26 Jun 2021 23:21:44 +0300 Subject: [PATCH 23/28] Replace `ShutdownError` with `()` --- src/dispatching/dispatcher.rs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 6a94258b..763c2c2e 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -468,22 +468,15 @@ pub struct ShutdownToken { impl ShutdownToken { /// Tries to shutdown dispatching. /// - /// Returns error if this dispather isn't dispatching at the moment. + /// Returns error if this dispather is idle at the moment. /// /// If you don't need to wait for shutdown, returned future can be ignored. - pub fn shutdown(&self) -> Result + '_, ShutdownError> { + pub fn shutdown(&self) -> Result + '_, ()> { shutdown_inner(&self.dispatcher_state) .map(|()| async move { self.shutdown_notify_back.notified().await }) } } -/// Error occured while trying to shutdown dispatcher. -#[derive(Debug)] -pub enum ShutdownError { - /// Couldn"t stop dispatcher since it wasn't running. - Idle, -} - struct DispatcherState { inner: AtomicU8, } From 558e7d5a7263529ef3660c0de5acfb6f2adfea36 Mon Sep 17 00:00:00 2001 From: Waffle Date: Sun, 27 Jun 2021 10:50:47 +0300 Subject: [PATCH 24/28] Add `IdleShutdownError` --- src/dispatching/dispatcher.rs | 21 +++++++++++++++++---- src/dispatching/mod.rs | 2 +- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 763c2c2e..88c3aa1b 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -1,5 +1,5 @@ use std::{ - fmt::Debug, + fmt::{self, Debug}, sync::{ atomic::{AtomicU8, Ordering}, Arc, @@ -458,6 +458,19 @@ where } } +/// This error is returned from [`ShutdownToken::shutdown`] when trying to +/// shutdown idle dispatcher. +#[derive(Debug)] +pub struct IdleShutdownError; + +impl fmt::Display for IdleShutdownError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Dispatcher was idle and as such couldn't be shut down") + } +} + +impl std::error::Error for IdleShutdownError {} + /// A token which can be used to shutdown dispatcher. #[derive(Clone)] pub struct ShutdownToken { @@ -471,7 +484,7 @@ impl ShutdownToken { /// Returns error if this dispather is idle at the moment. /// /// If you don't need to wait for shutdown, returned future can be ignored. - pub fn shutdown(&self) -> Result + '_, ()> { + pub fn shutdown(&self) -> Result + '_, IdleShutdownError> { shutdown_inner(&self.dispatcher_state) .map(|()| async move { self.shutdown_notify_back.notified().await }) } @@ -543,14 +556,14 @@ fn shutdown_check_timeout_for(update_listener: &impl UpdateListener) -> Du shutdown_check_timeout.checked_add(MIN_SHUTDOWN_CHECK_TIMEOUT).unwrap_or(shutdown_check_timeout) } -fn shutdown_inner(state: &DispatcherState) -> Result<(), ShutdownError> { +fn shutdown_inner(state: &DispatcherState) -> Result<(), IdleShutdownError> { use ShutdownState::*; let res = state.compare_exchange(Running, ShuttingDown); match res { Ok(_) | Err(ShuttingDown) => Ok(()), - Err(Idle) => Err(ShutdownError::Idle), + Err(Idle) => Err(IdleShutdownError), Err(Running) => unreachable!(), } } diff --git a/src/dispatching/mod.rs b/src/dispatching/mod.rs index 1d68bd41..9936de96 100644 --- a/src/dispatching/mod.rs +++ b/src/dispatching/mod.rs @@ -56,7 +56,7 @@ mod dispatcher_handler; mod dispatcher_handler_rx_ext; mod update_with_cx; -pub use dispatcher::{Dispatcher, ShutdownError, ShutdownToken}; +pub use dispatcher::{Dispatcher, IdleShutdownError, ShutdownToken}; pub use dispatcher_handler::DispatcherHandler; pub use dispatcher_handler_rx_ext::DispatcherHandlerRxExt; use tokio::sync::mpsc::UnboundedReceiver; From 8796bc11baf0d83dd2af18e2515fe29fc2b6dece Mon Sep 17 00:00:00 2001 From: Waffle Date: Sun, 27 Jun 2021 11:04:55 +0300 Subject: [PATCH 25/28] Update changelog --- CHANGELOG.md | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a7b64c0..4ff0cb6c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,8 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `Storage::get_dialogue` to obtain a dialogue indexed by a chat ID. - `InMemStorageError` with a single variant `DialogueNotFound` to be returned from `InMemStorage::remove_dialogue`. - `RedisStorageError::DialogueNotFound` and `SqliteStorageError::DialogueNotFound` to be returned from `Storage::remove_dialogue`. - - `Dispatcher::shutdown` function. + - A way to `shutdown` dispatcher + - `Dispatcher::shutdown_token` function. + - `ShutdownToken` with a `shutdown` function. - `Dispatcher::setup_ctrlc_handler` function ([issue 153](https://github.com/teloxide/teloxide/issues/153)). + - `IdleShutdownError` ### Changed @@ -22,11 +25,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Automatically delete a webhook if it was set up in `update_listeners::polling_default` (thereby making it `async`, [issue 319](https://github.com/teloxide/teloxide/issues/319)). - `polling` and `polling_default` now require `R: 'static` - Refactor `UpdateListener` trait: - - Add a `stop` function that allows stopping the listener ([issue 166](https://github.com/teloxide/teloxide/issues/166)). + - Add a `StopToken` associated type. + - It must implement a new `StopToken` trait which has the only function `fn stop(self);` + - Add a `stop_token` function that returns `Self::StopToken` and allows stopping the listener later ([issue 166](https://github.com/teloxide/teloxide/issues/166)). - Remove blanked implementation. - Remove `Stream` from super traits. - Add `AsUpdateStream` to super traits. - Add an `AsUpdateStream` trait that allows turning implementors into streams of updates (GAT workaround). + - Add a `timeout_hint` function (with a default implementation). + - `Dispatcher::dispatch` and `Dispatcher::dispatch_with_listener` now require mutable reference to self. + - Repls can now be stopped by `^C` signal. + - `Noop` and `AsyncStopToken`stop tokens. + - `StatefulListener`. ### Fixed From afe5a9716b0e21c903cbf1b3bdf04384012b521a Mon Sep 17 00:00:00 2001 From: Hirrolot Date: Sun, 27 Jun 2021 14:57:02 +0600 Subject: [PATCH 26/28] Enhance the docs --- src/dispatching/dispatcher.rs | 28 +++++++++++-------- src/dispatching/stop_token.rs | 10 ++++--- src/dispatching/update_listeners.rs | 18 ++++++------ .../update_listeners/stateful_listener.rs | 20 ++++++------- 4 files changed, 40 insertions(+), 36 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 88c3aa1b..5dd64783 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -102,7 +102,7 @@ where Some(tx) } - /// Setup `^C` handler which [`shutdown`]s dispatching. + /// Setup the `^C` handler which [`shutdown`]s dispatching. /// /// [`shutdown`]: ShutdownToken::shutdown #[cfg(feature = "ctrlc_handler")] @@ -246,11 +246,12 @@ where /// errors produced by this listener). /// /// Please note that after shutting down (either because of [`shutdown`], - /// [ctrlc signal], or `update_listener` returning `None`) all handlers will - /// be gone. As such, to restart listening you need to re-add handlers. + /// [a ctrlc signal], or [`UpdateListener`] returning `None`) all handlers + /// will be gone. As such, to restart listening you need to re-add + /// handlers. /// /// [`shutdown`]: ShutdownToken::shutdown - /// [ctrlc signal]: Dispatcher::setup_ctrlc_handler + /// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler pub async fn dispatch(&mut self) where R: Requester + Clone, @@ -267,11 +268,12 @@ where /// `update_listener_error_handler`. /// /// Please note that after shutting down (either because of [`shutdown`], - /// [ctrlc signal], or `update_listener` returning `None`) all handlers will - /// be gone. As such, to restart listening you need to re-add handlers. + /// [a ctrlc signal], or [`UpdateListener`] returning `None`) all handlers + /// will be gone. As such, to restart listening you need to re-add + /// handlers. /// /// [`shutdown`]: ShutdownToken::shutdown - /// [ctrlc signal]: Dispatcher::setup_ctrlc_handler + /// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>( &'a mut self, mut update_listener: UListener, @@ -330,7 +332,8 @@ where self.state.store(Idle); } - /// Returns shutdown token, which can later be used to shutdown dispatching. + /// Returns a shutdown token, which can later be used to shutdown + /// dispatching. pub fn shutdown_token(&self) -> ShutdownToken { ShutdownToken { dispatcher_state: Arc::clone(&self.state), @@ -459,7 +462,7 @@ where } /// This error is returned from [`ShutdownToken::shutdown`] when trying to -/// shutdown idle dispatcher. +/// shutdown an idle [`Dispatcher`]. #[derive(Debug)] pub struct IdleShutdownError; @@ -471,7 +474,7 @@ impl fmt::Display for IdleShutdownError { impl std::error::Error for IdleShutdownError {} -/// A token which can be used to shutdown dispatcher. +/// A token which used to shutdown [`Dispatcher`]. #[derive(Clone)] pub struct ShutdownToken { dispatcher_state: Arc, @@ -481,9 +484,10 @@ pub struct ShutdownToken { impl ShutdownToken { /// Tries to shutdown dispatching. /// - /// Returns error if this dispather is idle at the moment. + /// Returns an error if the dispatcher is idle at the moment. /// - /// If you don't need to wait for shutdown, returned future can be ignored. + /// If you don't need to wait for shutdown, the returned future can be + /// ignored. pub fn shutdown(&self) -> Result + '_, IdleShutdownError> { shutdown_inner(&self.dispatcher_state) .map(|()| async move { self.shutdown_notify_back.notified().await }) diff --git a/src/dispatching/stop_token.rs b/src/dispatching/stop_token.rs index faea1ed2..4198d4c0 100644 --- a/src/dispatching/stop_token.rs +++ b/src/dispatching/stop_token.rs @@ -1,8 +1,10 @@ +//! A stop token used to stop a listener. + use std::{future::Future, pin::Pin, task}; use futures::future::{pending, AbortHandle, Abortable, Pending}; -/// A stop token allows you to stop listener. +/// A stop token allows you to stop a listener. /// /// See also: [`UpdateListener::stop_token`]. /// @@ -27,8 +29,8 @@ pub struct AsyncStopToken(AbortHandle); /// A flag which corresponds to [`AsyncStopToken`]. /// -/// To know if stop token was used you can either repeatedly call [`is_stopped`] -/// or use this type as a `Future`. +/// To know if the stop token was used you can either repeatedly call +/// [`is_stopped`] or use this type as a `Future`. /// /// [`is_stopped`]: AsyncStopFlag::is_stopped #[pin_project::pin_project] @@ -52,7 +54,7 @@ impl StopToken for AsyncStopToken { } impl AsyncStopFlag { - /// Returns true if stop token linked to `self` was used. + /// Returns true if the stop token linked to `self` was used. pub fn is_stopped(&self) -> bool { self.0.is_aborted() } diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index b8448023..3e2b1316 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -127,21 +127,19 @@ pub use self::{ /// /// Some functions of this trait are located in the supertrait /// ([`AsUpdateStream`]), see also: -/// - [`Stream`] -/// - [`as_stream`] +/// - [`AsUpdateStream::Stream`] +/// - [`AsUpdateStream::as_stream`] /// /// [polling]: self#long-polling /// [webhooks]: self#webhooks -/// [`Stream`]: AsUpdateStream::Stream -/// [`as_stream`]: AsUpdateStream::as_stream pub trait UpdateListener: for<'a> AsUpdateStream<'a, E> { - /// Type of token which allows ti stop this listener. + /// The type of token which allows to stop this listener. type StopToken: StopToken; /// Returns a token which stops this listener. /// /// The [`stop`] function of the token is not guaranteed to have an - /// immediate effect. That is some listeners can return updates even + /// immediate effect. That is, some listeners can return updates even /// after [`stop`] is called (e.g.: because of buffering). /// /// [`stop`]: StopToken::stop @@ -153,15 +151,15 @@ pub trait UpdateListener: for<'a> AsUpdateStream<'a, E> { the returned token"] fn stop_token(&mut self) -> Self::StopToken; - /// Timeout duration hint. + /// The timeout duration hint. /// - /// This hints how often dispatcher should check for shutdown. E.g. for + /// This hints how often dispatcher should check for a shutdown. E.g., for /// [`polling()`] this returns the [`timeout`]. /// /// [`timeout`]: crate::payloads::GetUpdates::timeout /// /// If you are implementing this trait and not sure what to return from this - /// function, just leave it with default implementation. + /// function, just leave it with the default implementation. fn timeout_hint(&self) -> Option { None } @@ -171,7 +169,7 @@ pub trait UpdateListener: for<'a> AsUpdateStream<'a, E> { /// /// This trait is a workaround to not require GAT. pub trait AsUpdateStream<'a, E> { - /// Stream of updates from Telegram. + /// The stream of updates from Telegram. type Stream: Stream> + 'a; /// Creates the update [`Stream`]. diff --git a/src/dispatching/update_listeners/stateful_listener.rs b/src/dispatching/update_listeners/stateful_listener.rs index b7a243c6..ac1f8655 100644 --- a/src/dispatching/update_listeners/stateful_listener.rs +++ b/src/dispatching/update_listeners/stateful_listener.rs @@ -10,10 +10,10 @@ use crate::dispatching::{ /// A listener created from functions. /// -/// This type allows to turn a stream of updates (+some additional functions) +/// This type allows to turn a stream of updates (+ some additional functions) /// into an [`UpdateListener`]. /// -/// For an example of usage see [`polling`] +/// For an example of usage, see [`polling`]. /// /// [`polling`]: crate::dispatching::update_listeners::polling() #[non_exhaustive] @@ -21,18 +21,18 @@ pub struct StatefulListener { /// The state of the listener. pub state: St, - /// Function used as [`AsUpdateStream::as_stream`]. + /// The function used as [`AsUpdateStream::as_stream`]. /// /// Must be of type `for<'a> &'a mut St -> impl Stream + 'a` and callable by /// `&mut`. pub stream: Assf, - /// Function used as [`UpdateListener::stop_token`]. + /// The function used as [`UpdateListener::stop_token`]. /// /// Must be of type `for<'a> &'a mut St -> impl StopToken`. pub stop_token: Sf, - /// Function used as [`UpdateListener::timeout_hint`]. + /// The function used as [`UpdateListener::timeout_hint`]. /// /// Must be of type `for<'a> &'a St -> Option` and callable by /// `&`. @@ -40,14 +40,14 @@ pub struct StatefulListener { } impl StatefulListener fn(&'a St) -> Option> { - /// Creates new stateful listener from it's components. + /// Creates a new stateful listener from its components. pub fn new(state: St, stream: Assf, stop_token: Sf) -> Self { Self { state, stream, stop_token, timeout_hint: None } } } impl StatefulListener { - /// Creates new stateful listener from it's components. + /// Creates a new stateful listener from its components. pub fn new_with_timeout_hint( state: St, stream: Assf, @@ -68,10 +68,10 @@ impl where S: Stream> + Unpin + 'static, { - /// Creates a new update listner from a stream of updates which ignore stop - /// signals. + /// Creates a new update listener from a stream of updates which ignores + /// stop signals. /// - /// It won't be possible to ever stop this listener with stop token. + /// It won't be possible to ever stop this listener with a stop token. pub fn from_stream_without_graceful_shutdown(stream: S) -> Self { let this = Self { state: stream, From a6c480930a31aabb1bbb54e9011a6687c22fa724 Mon Sep 17 00:00:00 2001 From: Hirrolot Date: Sun, 27 Jun 2021 15:44:46 +0600 Subject: [PATCH 27/28] Print info about graceful shutdown to users --- CHANGELOG.md | 1 + src/dispatching/dispatcher.rs | 8 ++++---- src/logging.rs | 6 +++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ff0cb6c..9ce019ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Repls can now be stopped by `^C` signal. - `Noop` and `AsyncStopToken`stop tokens. - `StatefulListener`. + - Emit not only errors but also warnings and general information from teloxide, when set up by `enable_logging!`. ### Fixed diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 5dd64783..5f1243cb 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -113,7 +113,7 @@ where loop { tokio::signal::ctrl_c().await.expect("Failed to listen for ^C"); - log::debug!("^C receieved, trying to shutdown dispatcher"); + log::info!("^C received, trying to shutdown the dispatcher..."); // If dispatcher wasn't running, then there is nothing to do shutdown_inner(&state).ok(); @@ -310,7 +310,7 @@ where if let ShuttingDown = self.state.load() { if let Some(token) = stop_token.take() { - log::debug!("Start shutting down dispatching"); + log::debug!("Start shutting down dispatching..."); token.stop(); } } @@ -324,9 +324,9 @@ where // Notify `shutdown`s that we finished self.shutdown_notify_back.notify_waiters(); - log::debug!("Dispatching shut down"); + log::info!("Dispatching has been shut down."); } else { - log::debug!("Dispatching stopped (listener returned `None`)"); + log::debug!("Dispatching has been stopped (listener returned `None`)."); } self.state.store(Idle); diff --git a/src/logging.rs b/src/logging.rs index a1a7e11d..4dddac89 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -1,7 +1,7 @@ /// Enables logging through [pretty-env-logger]. /// -/// A logger will **only** print errors from teloxide and **all** logs from -/// your program. +/// A logger will **only** print errors, warnings, and general information from +/// teloxide and **all** logs from your program. /// /// # Example /// ```no_compile @@ -46,7 +46,7 @@ macro_rules! enable_logging_with_filter { pretty_env_logger::formatted_builder() .write_style(pretty_env_logger::env_logger::WriteStyle::Auto) .filter(Some(&env!("CARGO_PKG_NAME").replace("-", "_")), $filter) - .filter(Some("teloxide"), log::LevelFilter::Error) + .filter(Some("teloxide"), log::LevelFilter::Info) .init(); }; } From c5ac8b5f9c2873fe2d4bad8c88036dcdf43d51cb Mon Sep 17 00:00:00 2001 From: Hirrolot Date: Sun, 27 Jun 2021 16:36:36 +0600 Subject: [PATCH 28/28] Emit graceful shutdown info in all cases --- src/dispatching/dispatcher.rs | 8 +++++--- src/logging.rs | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 5f1243cb..2deb38e2 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -326,7 +326,7 @@ where self.shutdown_notify_back.notify_waiters(); log::info!("Dispatching has been shut down."); } else { - log::debug!("Dispatching has been stopped (listener returned `None`)."); + log::info!("Dispatching has been stopped (listener returned `None`)."); } self.state.store(Idle); @@ -489,8 +489,10 @@ impl ShutdownToken { /// If you don't need to wait for shutdown, the returned future can be /// ignored. pub fn shutdown(&self) -> Result + '_, IdleShutdownError> { - shutdown_inner(&self.dispatcher_state) - .map(|()| async move { self.shutdown_notify_back.notified().await }) + shutdown_inner(&self.dispatcher_state).map(|()| async move { + log::info!("Trying to shutdown the dispatcher..."); + self.shutdown_notify_back.notified().await + }) } } diff --git a/src/logging.rs b/src/logging.rs index 4dddac89..fd7e5192 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -23,8 +23,8 @@ macro_rules! enable_logging { /// Enables logging through [pretty-env-logger] with a custom filter for your /// program. /// -/// A logger will **only** print errors from teloxide and restrict logs from -/// your program by the specified filter. +/// A logger will **only** print errors, warnings, and general information from +/// teloxide and restrict logs from your program by the specified filter. /// /// # Example /// Allow printing all logs from your program up to [`LevelFilter::Debug`] (i.e.