From faef0c6ffbebce34c6b4fadff68d10a5ef437ecf Mon Sep 17 00:00:00 2001 From: Waffle <waffle.lapkin@gmail.com> Date: Sun, 27 Jun 2021 14:29:17 +0300 Subject: [PATCH 1/2] Implement automatic update filtering Add `UpdateListener::hint_allowed_updates` and `StatefulListener::hint_allowed_updates`. Make `Dispatcher` call `UpdateListener::hint_allowed_updates` when starting dispatching. --- CHANGELOG.md | 1 + src/dispatching/dispatcher.rs | 67 ++++++++++++++++++- src/dispatching/update_listeners.rs | 20 +++++- src/dispatching/update_listeners/polling.rs | 8 ++- .../update_listeners/stateful_listener.rs | 61 +++++++++++------ 5 files changed, 134 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ce019ad..07131416 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `ShutdownToken` with a `shutdown` function. - `Dispatcher::setup_ctrlc_handler` function ([issue 153](https://github.com/teloxide/teloxide/issues/153)). - `IdleShutdownError` + - Automatic update filtering ([issue 389](https://github.com/teloxide/teloxide/issues/389)). ### Changed diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 5f1243cb..4c3e2a47 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -20,8 +20,8 @@ use futures::{stream::FuturesUnordered, Future, StreamExt}; use teloxide_core::{ requests::Requester, types::{ - CallbackQuery, ChatMemberUpdated, ChosenInlineResult, InlineQuery, Message, Poll, - PollAnswer, PreCheckoutQuery, ShippingQuery, Update, UpdateKind, + AllowedUpdate, CallbackQuery, ChatMemberUpdated, ChosenInlineResult, InlineQuery, Message, + Poll, PollAnswer, PreCheckoutQuery, ShippingQuery, Update, UpdateKind, }, }; use tokio::{ @@ -286,6 +286,8 @@ where { use ShutdownState::*; + self.hint_allowed_updates(&mut update_listener); + let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener); let mut stop_token = Some(update_listener.stop_token()); @@ -438,6 +440,67 @@ where } } + fn hint_allowed_updates<E>(&self, listener: &mut impl UpdateListener<E>) { + let mut allowed = self + .messages_queue + .as_ref() + .map(|_| AllowedUpdate::Message) + .into_iter() + .chain( + self.edited_messages_queue + .as_ref() + .map(|_| AllowedUpdate::EditedMessage) + .into_iter(), + ) + .chain( + self.channel_posts_queue.as_ref().map(|_| AllowedUpdate::ChannelPost).into_iter(), + ) + .chain( + self.edited_channel_posts_queue + .as_ref() + .map(|_| AllowedUpdate::EditedChannelPost) + .into_iter(), + ) + .chain( + self.inline_queries_queue.as_ref().map(|_| AllowedUpdate::InlineQuery).into_iter(), + ) + .chain( + self.chosen_inline_results_queue + .as_ref() + .map(|_| AllowedUpdate::ChosenInlineResult) + .into_iter(), + ) + .chain( + self.callback_queries_queue + .as_ref() + .map(|_| AllowedUpdate::CallbackQuery) + .into_iter(), + ) + .chain( + self.shipping_queries_queue + .as_ref() + .map(|_| AllowedUpdate::ShippingQuery) + .into_iter(), + ) + .chain( + self.pre_checkout_queries_queue + .as_ref() + .map(|_| AllowedUpdate::PreCheckoutQuery) + .into_iter(), + ) + .chain(self.polls_queue.as_ref().map(|_| AllowedUpdate::Poll).into_iter()) + .chain(self.poll_answers_queue.as_ref().map(|_| AllowedUpdate::PollAnswer).into_iter()) + .chain( + self.my_chat_members_queue + .as_ref() + .map(|_| AllowedUpdate::MyChatMember) + .into_iter(), + ) + .chain(self.chat_members_queue.as_ref().map(|_| AllowedUpdate::ChatMember).into_iter()); + + listener.hint_allowed_updates(&mut allowed); + } + async fn wait_for_handlers(&mut self) { log::debug!("Waiting for handlers to finish"); diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index 3e2b1316..a0d9edca 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -107,7 +107,10 @@ use futures::Stream; use std::time::Duration; -use crate::{dispatching::stop_token::StopToken, types::Update}; +use crate::{ + dispatching::stop_token::StopToken, + types::{AllowedUpdate, Update}, +}; mod polling; mod stateful_listener; @@ -151,6 +154,21 @@ pub trait UpdateListener<E>: for<'a> AsUpdateStream<'a, E> { the returned token"] fn stop_token(&mut self) -> Self::StopToken; + /// Hint which updates should the listener listen for. + /// + /// For example [`polling()`] should send the hint as + /// [`GetUpdates::allowed_updates`] + /// + /// Note however that this is a _hint_ and as such, it can be ignored. The + /// listener is not guaranteed to only return updates which types are listed + /// in the hint. + /// + /// [`GetUpdates::allowed_updates`]: + /// crate::payloads::GetUpdates::allowed_updates + fn hint_allowed_updates(&mut self, hint: &mut dyn Iterator<Item = AllowedUpdate>) { + let _ = hint; + } + /// The timeout duration hint. /// /// This hints how often dispatcher should check for a shutdown. E.g., for diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index 50f1626c..5440fd6a 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -146,9 +146,15 @@ where let stop_token = |st: &mut State<_>| st.token.clone(); + let hint_allowed_updates = + Some(|state: &mut State<_>, allowed: &mut dyn Iterator<Item = AllowedUpdate>| { + // TODO: we should probably warn if there already were different allowed updates + // before + state.allowed_updates = Some(allowed.collect()); + }); let timeout_hint = Some(move |_: &State<_>| timeout); - StatefulListener { state, stream, stop_token, timeout_hint } + StatefulListener::new_with_hints(state, stream, stop_token, hint_allowed_updates, timeout_hint) } async fn delete_webhook_if_setup<R>(requester: &R) diff --git a/src/dispatching/update_listeners/stateful_listener.rs b/src/dispatching/update_listeners/stateful_listener.rs index ac1f8655..a9c26576 100644 --- a/src/dispatching/update_listeners/stateful_listener.rs +++ b/src/dispatching/update_listeners/stateful_listener.rs @@ -1,11 +1,13 @@ use std::time::Duration; use futures::Stream; -use teloxide_core::types::Update; -use crate::dispatching::{ - stop_token::{self, StopToken}, - update_listeners::{AsUpdateStream, UpdateListener}, +use crate::{ + dispatching::{ + stop_token::{self, StopToken}, + update_listeners::{AsUpdateStream, UpdateListener}, + }, + types::{AllowedUpdate, Update}, }; /// A listener created from functions. @@ -17,7 +19,7 @@ use crate::dispatching::{ /// /// [`polling`]: crate::dispatching::update_listeners::polling() #[non_exhaustive] -pub struct StatefulListener<St, Assf, Sf, Thf> { +pub struct StatefulListener<St, Assf, Sf, Hauf, Thf> { /// The state of the listener. pub state: St, @@ -32,6 +34,12 @@ pub struct StatefulListener<St, Assf, Sf, Thf> { /// Must be of type `for<'a> &'a mut St -> impl StopToken`. pub stop_token: Sf, + /// The function used as [`UpdateListener::hint_allowed_updates`]. + /// + /// Must be of type `for<'a, 'b> &'a mut St, &'b mut dyn Iterator<Item = + /// AllowedUpdate> -> ()`. + pub hint_allowed_updates: Option<Hauf>, + /// The function used as [`UpdateListener::timeout_hint`]. /// /// Must be of type `for<'a> &'a St -> Option<Duration>` and callable by @@ -39,22 +47,26 @@ pub struct StatefulListener<St, Assf, Sf, Thf> { pub timeout_hint: Option<Thf>, } -impl<St, Assf, Sf> StatefulListener<St, Assf, Sf, for<'a> fn(&'a St) -> Option<Duration>> { +type Haufn<State> = for<'a, 'b> fn(&'a mut State, &'b mut dyn Iterator<Item = AllowedUpdate>); +type Thfn<State> = for<'a> fn(&'a State) -> Option<Duration>; + +impl<St, Assf, Sf> StatefulListener<St, Assf, Sf, Haufn<St>, Thfn<St>> { /// Creates a new stateful listener from its components. pub fn new(state: St, stream: Assf, stop_token: Sf) -> Self { - Self { state, stream, stop_token, timeout_hint: None } + Self::new_with_hints(state, stream, stop_token, None, None) } } -impl<St, Assf, Sf, Thf> StatefulListener<St, Assf, Sf, Thf> { +impl<St, Assf, Sf, Hauf, Thf> StatefulListener<St, Assf, Sf, Hauf, Thf> { /// Creates a new stateful listener from its components. - pub fn new_with_timeout_hint( + pub fn new_with_hints( state: St, stream: Assf, stop_token: Sf, + hint_allowed_updates: Option<Hauf>, timeout_hint: Option<Thf>, ) -> Self { - Self { state, stream, stop_token, timeout_hint } + Self { state, stream, stop_token, hint_allowed_updates, timeout_hint } } } @@ -63,7 +75,8 @@ impl<S, E> S, for<'a> fn(&'a mut S) -> &'a mut S, for<'a> fn(&'a mut S) -> stop_token::Noop, - for<'a> fn(&'a S) -> Option<Duration>, + Haufn<S>, + Thfn<S>, > where S: Stream<Item = Result<Update, E>> + Unpin + 'static, @@ -73,11 +86,12 @@ where /// /// It won't be possible to ever stop this listener with a stop token. pub fn from_stream_without_graceful_shutdown(stream: S) -> Self { - let this = Self { - state: stream, - stream: |s| s, - stop_token: |_| stop_token::Noop, - timeout_hint: Some(|_| { + let this = Self::new_with_hints( + stream, + |s| s, + |_| stop_token::Noop, + None, + Some(|_| { // FIXME: replace this by just Duration::MAX once 1.53 releases // be released const NANOS_PER_SEC: u32 = 1_000_000_000; @@ -85,13 +99,14 @@ where Some(dmax) }), - }; + ); assert_update_listener(this) } } -impl<'a, St, Assf, Sf, Thf, Strm, E> AsUpdateStream<'a, E> for StatefulListener<St, Assf, Sf, Thf> +impl<'a, St, Assf, Sf, Hauf, Thf, Strm, E> AsUpdateStream<'a, E> + for StatefulListener<St, Assf, Hauf, Sf, Thf> where (St, Strm): 'a, Assf: FnMut(&'a mut St) -> Strm, @@ -104,11 +119,13 @@ where } } -impl<St, Assf, Sf, Stt, Thf, E> UpdateListener<E> for StatefulListener<St, Assf, Sf, Thf> +impl<St, Assf, Sf, Hauf, Stt, Thf, E> UpdateListener<E> + for StatefulListener<St, Assf, Sf, Hauf, Thf> where Self: for<'a> AsUpdateStream<'a, E>, Sf: FnMut(&mut St) -> Stt, Stt: StopToken, + Hauf: FnMut(&mut St, &mut dyn Iterator<Item = AllowedUpdate>), Thf: Fn(&St) -> Option<Duration>, { type StopToken = Stt; @@ -117,6 +134,12 @@ where (self.stop_token)(&mut self.state) } + fn hint_allowed_updates(&mut self, hint: &mut dyn Iterator<Item = AllowedUpdate>) { + if let Some(f) = &mut self.hint_allowed_updates { + f(&mut self.state, hint); + } + } + fn timeout_hint(&self) -> Option<Duration> { self.timeout_hint.as_ref().and_then(|f| f(&self.state)) } From 7859294ff63c0eae239fca7c722fd16fe20a26ef Mon Sep 17 00:00:00 2001 From: Waffle <waffle.lapkin@gmail.com> Date: Sun, 27 Jun 2021 15:52:23 +0300 Subject: [PATCH 2/2] Apply suggestion from the review --- src/dispatching/dispatcher.rs | 106 ++++++++++++++++------------------ 1 file changed, 50 insertions(+), 56 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 4c3e2a47..4137e95f 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -441,62 +441,56 @@ where } fn hint_allowed_updates<E>(&self, listener: &mut impl UpdateListener<E>) { - let mut allowed = self - .messages_queue - .as_ref() - .map(|_| AllowedUpdate::Message) - .into_iter() - .chain( - self.edited_messages_queue - .as_ref() - .map(|_| AllowedUpdate::EditedMessage) - .into_iter(), - ) - .chain( - self.channel_posts_queue.as_ref().map(|_| AllowedUpdate::ChannelPost).into_iter(), - ) - .chain( - self.edited_channel_posts_queue - .as_ref() - .map(|_| AllowedUpdate::EditedChannelPost) - .into_iter(), - ) - .chain( - self.inline_queries_queue.as_ref().map(|_| AllowedUpdate::InlineQuery).into_iter(), - ) - .chain( - self.chosen_inline_results_queue - .as_ref() - .map(|_| AllowedUpdate::ChosenInlineResult) - .into_iter(), - ) - .chain( - self.callback_queries_queue - .as_ref() - .map(|_| AllowedUpdate::CallbackQuery) - .into_iter(), - ) - .chain( - self.shipping_queries_queue - .as_ref() - .map(|_| AllowedUpdate::ShippingQuery) - .into_iter(), - ) - .chain( - self.pre_checkout_queries_queue - .as_ref() - .map(|_| AllowedUpdate::PreCheckoutQuery) - .into_iter(), - ) - .chain(self.polls_queue.as_ref().map(|_| AllowedUpdate::Poll).into_iter()) - .chain(self.poll_answers_queue.as_ref().map(|_| AllowedUpdate::PollAnswer).into_iter()) - .chain( - self.my_chat_members_queue - .as_ref() - .map(|_| AllowedUpdate::MyChatMember) - .into_iter(), - ) - .chain(self.chat_members_queue.as_ref().map(|_| AllowedUpdate::ChatMember).into_iter()); + fn hint_handler_allowed_update<T>( + queue: &Option<T>, + kind: AllowedUpdate, + ) -> std::option::IntoIter<AllowedUpdate> { + queue.as_ref().map(|_| kind).into_iter() + } + + let mut allowed = hint_handler_allowed_update(&self.messages_queue, AllowedUpdate::Message) + .chain(hint_handler_allowed_update( + &self.edited_messages_queue, + AllowedUpdate::EditedMessage, + )) + .chain(hint_handler_allowed_update( + &self.channel_posts_queue, + AllowedUpdate::ChannelPost, + )) + .chain(hint_handler_allowed_update( + &self.edited_channel_posts_queue, + AllowedUpdate::EditedChannelPost, + )) + .chain(hint_handler_allowed_update( + &self.inline_queries_queue, + AllowedUpdate::InlineQuery, + )) + .chain(hint_handler_allowed_update( + &self.chosen_inline_results_queue, + AllowedUpdate::ChosenInlineResult, + )) + .chain(hint_handler_allowed_update( + &self.callback_queries_queue, + AllowedUpdate::CallbackQuery, + )) + .chain(hint_handler_allowed_update( + &self.shipping_queries_queue, + AllowedUpdate::ShippingQuery, + )) + .chain(hint_handler_allowed_update( + &self.pre_checkout_queries_queue, + AllowedUpdate::PreCheckoutQuery, + )) + .chain(hint_handler_allowed_update(&self.polls_queue, AllowedUpdate::Poll)) + .chain(hint_handler_allowed_update(&self.poll_answers_queue, AllowedUpdate::PollAnswer)) + .chain(hint_handler_allowed_update( + &self.my_chat_members_queue, + AllowedUpdate::MyChatMember, + )) + .chain(hint_handler_allowed_update( + &self.chat_members_queue, + AllowedUpdate::ChatMember, + )); listener.hint_allowed_updates(&mut allowed); }