Implement automatic update filtering

Add `UpdateListener::hint_allowed_updates` and `StatefulListener::hint_allowed_updates`.

Make `Dispatcher` call `UpdateListener::hint_allowed_updates` when starting dispatching.
This commit is contained in:
Waffle 2021-06-27 14:29:17 +03:00
parent a6c480930a
commit faef0c6ffb
5 changed files with 134 additions and 23 deletions

View file

@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `ShutdownToken` with a `shutdown` function.
- `Dispatcher::setup_ctrlc_handler` function ([issue 153](https://github.com/teloxide/teloxide/issues/153)).
- `IdleShutdownError`
- Automatic update filtering ([issue 389](https://github.com/teloxide/teloxide/issues/389)).
### Changed

View file

@ -20,8 +20,8 @@ use futures::{stream::FuturesUnordered, Future, StreamExt};
use teloxide_core::{
requests::Requester,
types::{
CallbackQuery, ChatMemberUpdated, ChosenInlineResult, InlineQuery, Message, Poll,
PollAnswer, PreCheckoutQuery, ShippingQuery, Update, UpdateKind,
AllowedUpdate, CallbackQuery, ChatMemberUpdated, ChosenInlineResult, InlineQuery, Message,
Poll, PollAnswer, PreCheckoutQuery, ShippingQuery, Update, UpdateKind,
},
};
use tokio::{
@ -286,6 +286,8 @@ where
{
use ShutdownState::*;
self.hint_allowed_updates(&mut update_listener);
let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener);
let mut stop_token = Some(update_listener.stop_token());
@ -438,6 +440,67 @@ where
}
}
fn hint_allowed_updates<E>(&self, listener: &mut impl UpdateListener<E>) {
let mut allowed = self
.messages_queue
.as_ref()
.map(|_| AllowedUpdate::Message)
.into_iter()
.chain(
self.edited_messages_queue
.as_ref()
.map(|_| AllowedUpdate::EditedMessage)
.into_iter(),
)
.chain(
self.channel_posts_queue.as_ref().map(|_| AllowedUpdate::ChannelPost).into_iter(),
)
.chain(
self.edited_channel_posts_queue
.as_ref()
.map(|_| AllowedUpdate::EditedChannelPost)
.into_iter(),
)
.chain(
self.inline_queries_queue.as_ref().map(|_| AllowedUpdate::InlineQuery).into_iter(),
)
.chain(
self.chosen_inline_results_queue
.as_ref()
.map(|_| AllowedUpdate::ChosenInlineResult)
.into_iter(),
)
.chain(
self.callback_queries_queue
.as_ref()
.map(|_| AllowedUpdate::CallbackQuery)
.into_iter(),
)
.chain(
self.shipping_queries_queue
.as_ref()
.map(|_| AllowedUpdate::ShippingQuery)
.into_iter(),
)
.chain(
self.pre_checkout_queries_queue
.as_ref()
.map(|_| AllowedUpdate::PreCheckoutQuery)
.into_iter(),
)
.chain(self.polls_queue.as_ref().map(|_| AllowedUpdate::Poll).into_iter())
.chain(self.poll_answers_queue.as_ref().map(|_| AllowedUpdate::PollAnswer).into_iter())
.chain(
self.my_chat_members_queue
.as_ref()
.map(|_| AllowedUpdate::MyChatMember)
.into_iter(),
)
.chain(self.chat_members_queue.as_ref().map(|_| AllowedUpdate::ChatMember).into_iter());
listener.hint_allowed_updates(&mut allowed);
}
async fn wait_for_handlers(&mut self) {
log::debug!("Waiting for handlers to finish");

View file

@ -107,7 +107,10 @@ use futures::Stream;
use std::time::Duration;
use crate::{dispatching::stop_token::StopToken, types::Update};
use crate::{
dispatching::stop_token::StopToken,
types::{AllowedUpdate, Update},
};
mod polling;
mod stateful_listener;
@ -151,6 +154,21 @@ pub trait UpdateListener<E>: for<'a> AsUpdateStream<'a, E> {
the returned token"]
fn stop_token(&mut self) -> Self::StopToken;
/// Hint which updates should the listener listen for.
///
/// For example [`polling()`] should send the hint as
/// [`GetUpdates::allowed_updates`]
///
/// Note however that this is a _hint_ and as such, it can be ignored. The
/// listener is not guaranteed to only return updates which types are listed
/// in the hint.
///
/// [`GetUpdates::allowed_updates`]:
/// crate::payloads::GetUpdates::allowed_updates
fn hint_allowed_updates(&mut self, hint: &mut dyn Iterator<Item = AllowedUpdate>) {
let _ = hint;
}
/// The timeout duration hint.
///
/// This hints how often dispatcher should check for a shutdown. E.g., for

View file

@ -146,9 +146,15 @@ where
let stop_token = |st: &mut State<_>| st.token.clone();
let hint_allowed_updates =
Some(|state: &mut State<_>, allowed: &mut dyn Iterator<Item = AllowedUpdate>| {
// TODO: we should probably warn if there already were different allowed updates
// before
state.allowed_updates = Some(allowed.collect());
});
let timeout_hint = Some(move |_: &State<_>| timeout);
StatefulListener { state, stream, stop_token, timeout_hint }
StatefulListener::new_with_hints(state, stream, stop_token, hint_allowed_updates, timeout_hint)
}
async fn delete_webhook_if_setup<R>(requester: &R)

View file

@ -1,11 +1,13 @@
use std::time::Duration;
use futures::Stream;
use teloxide_core::types::Update;
use crate::dispatching::{
stop_token::{self, StopToken},
update_listeners::{AsUpdateStream, UpdateListener},
use crate::{
dispatching::{
stop_token::{self, StopToken},
update_listeners::{AsUpdateStream, UpdateListener},
},
types::{AllowedUpdate, Update},
};
/// A listener created from functions.
@ -17,7 +19,7 @@ use crate::dispatching::{
///
/// [`polling`]: crate::dispatching::update_listeners::polling()
#[non_exhaustive]
pub struct StatefulListener<St, Assf, Sf, Thf> {
pub struct StatefulListener<St, Assf, Sf, Hauf, Thf> {
/// The state of the listener.
pub state: St,
@ -32,6 +34,12 @@ pub struct StatefulListener<St, Assf, Sf, Thf> {
/// Must be of type `for<'a> &'a mut St -> impl StopToken`.
pub stop_token: Sf,
/// The function used as [`UpdateListener::hint_allowed_updates`].
///
/// Must be of type `for<'a, 'b> &'a mut St, &'b mut dyn Iterator<Item =
/// AllowedUpdate> -> ()`.
pub hint_allowed_updates: Option<Hauf>,
/// The function used as [`UpdateListener::timeout_hint`].
///
/// Must be of type `for<'a> &'a St -> Option<Duration>` and callable by
@ -39,22 +47,26 @@ pub struct StatefulListener<St, Assf, Sf, Thf> {
pub timeout_hint: Option<Thf>,
}
impl<St, Assf, Sf> StatefulListener<St, Assf, Sf, for<'a> fn(&'a St) -> Option<Duration>> {
type Haufn<State> = for<'a, 'b> fn(&'a mut State, &'b mut dyn Iterator<Item = AllowedUpdate>);
type Thfn<State> = for<'a> fn(&'a State) -> Option<Duration>;
impl<St, Assf, Sf> StatefulListener<St, Assf, Sf, Haufn<St>, Thfn<St>> {
/// Creates a new stateful listener from its components.
pub fn new(state: St, stream: Assf, stop_token: Sf) -> Self {
Self { state, stream, stop_token, timeout_hint: None }
Self::new_with_hints(state, stream, stop_token, None, None)
}
}
impl<St, Assf, Sf, Thf> StatefulListener<St, Assf, Sf, Thf> {
impl<St, Assf, Sf, Hauf, Thf> StatefulListener<St, Assf, Sf, Hauf, Thf> {
/// Creates a new stateful listener from its components.
pub fn new_with_timeout_hint(
pub fn new_with_hints(
state: St,
stream: Assf,
stop_token: Sf,
hint_allowed_updates: Option<Hauf>,
timeout_hint: Option<Thf>,
) -> Self {
Self { state, stream, stop_token, timeout_hint }
Self { state, stream, stop_token, hint_allowed_updates, timeout_hint }
}
}
@ -63,7 +75,8 @@ impl<S, E>
S,
for<'a> fn(&'a mut S) -> &'a mut S,
for<'a> fn(&'a mut S) -> stop_token::Noop,
for<'a> fn(&'a S) -> Option<Duration>,
Haufn<S>,
Thfn<S>,
>
where
S: Stream<Item = Result<Update, E>> + Unpin + 'static,
@ -73,11 +86,12 @@ where
///
/// It won't be possible to ever stop this listener with a stop token.
pub fn from_stream_without_graceful_shutdown(stream: S) -> Self {
let this = Self {
state: stream,
stream: |s| s,
stop_token: |_| stop_token::Noop,
timeout_hint: Some(|_| {
let this = Self::new_with_hints(
stream,
|s| s,
|_| stop_token::Noop,
None,
Some(|_| {
// FIXME: replace this by just Duration::MAX once 1.53 releases
// be released
const NANOS_PER_SEC: u32 = 1_000_000_000;
@ -85,13 +99,14 @@ where
Some(dmax)
}),
};
);
assert_update_listener(this)
}
}
impl<'a, St, Assf, Sf, Thf, Strm, E> AsUpdateStream<'a, E> for StatefulListener<St, Assf, Sf, Thf>
impl<'a, St, Assf, Sf, Hauf, Thf, Strm, E> AsUpdateStream<'a, E>
for StatefulListener<St, Assf, Hauf, Sf, Thf>
where
(St, Strm): 'a,
Assf: FnMut(&'a mut St) -> Strm,
@ -104,11 +119,13 @@ where
}
}
impl<St, Assf, Sf, Stt, Thf, E> UpdateListener<E> for StatefulListener<St, Assf, Sf, Thf>
impl<St, Assf, Sf, Hauf, Stt, Thf, E> UpdateListener<E>
for StatefulListener<St, Assf, Sf, Hauf, Thf>
where
Self: for<'a> AsUpdateStream<'a, E>,
Sf: FnMut(&mut St) -> Stt,
Stt: StopToken,
Hauf: FnMut(&mut St, &mut dyn Iterator<Item = AllowedUpdate>),
Thf: Fn(&St) -> Option<Duration>,
{
type StopToken = Stt;
@ -117,6 +134,12 @@ where
(self.stop_token)(&mut self.state)
}
fn hint_allowed_updates(&mut self, hint: &mut dyn Iterator<Item = AllowedUpdate>) {
if let Some(f) = &mut self.hint_allowed_updates {
f(&mut self.state, hint);
}
}
fn timeout_hint(&self) -> Option<Duration> {
self.timeout_hint.as_ref().and_then(|f| f(&self.state))
}