From 5c40bfa78c8f276f1a66fb1d03dbb2f14aaf8e71 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Fri, 9 Sep 2022 21:14:44 +0400 Subject: [PATCH 1/2] Replace `StopToken` trait with a single type Former-commit-id: 9527f82608eda0b1a737aa04f473e5dc50d0e677 --- CHANGELOG.md | 5 ++ src/dispatching.rs | 1 - src/dispatching/dispatcher.rs | 2 +- src/dispatching/stop_token.rs | 79 ------------------- src/dispatching/update_listeners.rs | 7 +- src/dispatching/update_listeners/polling.rs | 15 ++-- .../update_listeners/stateful_listener.rs | 51 ++---------- .../update_listeners/webhooks/axum.rs | 22 +++--- src/lib.rs | 1 + src/stop.rs | 67 ++++++++++++++++ 10 files changed, 97 insertions(+), 153 deletions(-) delete mode 100644 src/dispatching/stop_token.rs create mode 100644 src/stop.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index a27331e8..c4995282 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Updated `teloxide-macros` see its [changelog](https://github.com/teloxide/teloxide-macros/blob/master/CHANGELOG.md#unreleased) for more - `UpdateListener` now has an associated type `Err` instead of a generic - `AsUpdateStream` now has an associated type `StreamErr` instead of a generic +- Rename `dispatching::stop_token::{AsyncStopToken, AsyncStopFlag}` => `stop::{StopToken, StopFlag}` + +### Removed + +- `dispatching::stop_token::StopToken` trait (all uses are replaced with `stop::StopToken` structure) ## 0.10.1 - 2022-07-22 diff --git a/src/dispatching.rs b/src/dispatching.rs index 0fe237bd..b650b308 100644 --- a/src/dispatching.rs +++ b/src/dispatching.rs @@ -233,7 +233,6 @@ mod filter_ext; mod handler_description; mod handler_ext; mod handler_factory; -pub mod stop_token; pub mod update_listeners; pub use crate::utils::shutdown_token::{IdleShutdownError, ShutdownToken}; diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 42ae20bd..c44ac97e 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -1,6 +1,6 @@ use crate::{ dispatching::{ - distribution::default_distribution_function, stop_token::StopToken, update_listeners, + distribution::default_distribution_function, update_listeners, update_listeners::UpdateListener, DefaultKey, DpHandlerDescription, ShutdownToken, }, error_handlers::{ErrorHandler, LoggingErrorHandler}, diff --git a/src/dispatching/stop_token.rs b/src/dispatching/stop_token.rs deleted file mode 100644 index f9c25aff..00000000 --- a/src/dispatching/stop_token.rs +++ /dev/null @@ -1,79 +0,0 @@ -//! A stop token used to stop a listener. - -use std::{future::Future, pin::Pin, task}; - -use futures::future::{pending, AbortHandle, Abortable, Pending}; - -/// A stop token allows you to stop a listener. -/// -/// See also: [`UpdateListener::stop_token`]. -/// -/// [`UpdateListener::stop_token`]: -/// crate::dispatching::update_listeners::UpdateListener::stop_token -pub trait StopToken { - /// Stop the listener linked to this token. - fn stop(self); -} - -/// A stop token which does nothing. May be used in prototyping or in cases -/// where you do not care about graceful shutdowning. -pub struct Noop; - -impl StopToken for Noop { - fn stop(self) {} -} - -/// A stop token which corresponds to [`AsyncStopFlag`]. -#[derive(Clone)] -pub struct AsyncStopToken(AbortHandle); - -/// A flag which corresponds to [`AsyncStopToken`]. -/// -/// To know if the stop token was used you can either repeatedly call -/// [`is_stopped`] or use this type as a `Future`. -/// -/// [`is_stopped`]: AsyncStopFlag::is_stopped -#[pin_project::pin_project] -#[derive(Clone)] -pub struct AsyncStopFlag(#[pin] Abortable>); - -impl AsyncStopToken { - /// Create a new token/flag pair. - #[must_use = "This function is pure, that is does nothing unless its output is used"] - pub fn new_pair() -> (Self, AsyncStopFlag) { - let (handle, reg) = AbortHandle::new_pair(); - let token = Self(handle); - let flag = AsyncStopFlag(Abortable::new(pending(), reg)); - - (token, flag) - } -} - -impl StopToken for AsyncStopToken { - fn stop(self) { - self.0.abort() - } -} - -impl AsyncStopFlag { - /// Returns true if the stop token linked to `self` was used. - #[must_use = "This function is pure, that is does nothing unless its output is used"] - pub fn is_stopped(&self) -> bool { - self.0.is_aborted() - } -} - -/// This future resolves when a stop token was used. -impl Future for AsyncStopFlag { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { - self.project().0.poll(cx).map(|res| { - debug_assert!( - res.is_err(), - "Pending Future can't ever be resolved, so Abortable is only resolved when \ - canceled" - ); - }) - } -} diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index 42169037..4103e398 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -35,7 +35,7 @@ use futures::Stream; use std::time::Duration; use crate::{ - dispatching::stop_token::StopToken, + stop::StopToken, types::{AllowedUpdate, Update}, }; @@ -65,9 +65,6 @@ pub trait UpdateListener: /// The type of errors that can be returned from this listener. type Err; - /// The type of token which allows to stop this listener. - type StopToken: StopToken + Send; - /// Returns a token which stops this listener. /// /// The [`stop`] function of the token is not guaranteed to have an @@ -81,7 +78,7 @@ pub trait UpdateListener: /// soon as all cached updates are returned. #[must_use = "This function doesn't stop listening, to stop listening you need to call `stop` \ on the returned token"] - fn stop_token(&mut self) -> Self::StopToken; + fn stop_token(&mut self) -> StopToken; /// Hint which updates should the listener listen for. /// diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index 74824bba..1543b322 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -13,11 +13,9 @@ use std::{ use futures::{ready, stream::Stream}; use crate::{ - dispatching::{ - stop_token::{AsyncStopFlag, AsyncStopToken}, - update_listeners::{assert_update_listener, AsUpdateStream, UpdateListener}, - }, + dispatching::update_listeners::{assert_update_listener, AsUpdateStream, UpdateListener}, requests::{HasPayload, Request, Requester}, + stop::{mk_stop_token, StopFlag, StopToken}, types::{AllowedUpdate, Update}, }; @@ -98,7 +96,7 @@ where /// See also: [`polling_default`], [`Polling`]. pub fn build(self) -> Polling { let Self { bot, timeout, limit, allowed_updates, drop_pending_updates } = self; - let (token, flag) = AsyncStopToken::new_pair(); + let (token, flag) = mk_stop_token(); let polling = Polling { bot, timeout, limit, allowed_updates, drop_pending_updates, flag, token }; @@ -242,8 +240,8 @@ pub struct Polling { limit: Option, allowed_updates: Option>, drop_pending_updates: bool, - flag: AsyncStopFlag, - token: AsyncStopToken, + flag: StopFlag, + token: StopToken, } impl Polling @@ -293,9 +291,8 @@ pub struct PollingStream<'a, B: Requester> { impl UpdateListener for Polling { type Err = B::Err; - type StopToken = AsyncStopToken; - fn stop_token(&mut self) -> Self::StopToken { + fn stop_token(&mut self) -> StopToken { self.token.clone() } diff --git a/src/dispatching/update_listeners/stateful_listener.rs b/src/dispatching/update_listeners/stateful_listener.rs index 7f23d795..0eec921d 100644 --- a/src/dispatching/update_listeners/stateful_listener.rs +++ b/src/dispatching/update_listeners/stateful_listener.rs @@ -3,10 +3,8 @@ use std::time::Duration; use futures::Stream; use crate::{ - dispatching::{ - stop_token::{self, StopToken}, - update_listeners::{assert_update_listener, AsUpdateStream, UpdateListener}, - }, + dispatching::update_listeners::{AsUpdateStream, UpdateListener}, + stop::StopToken, types::{AllowedUpdate, Update}, }; @@ -30,7 +28,7 @@ pub struct StatefulListener { /// The function used as [`UpdateListener::stop_token`]. /// - /// Must implement `FnMut(&mut St) -> impl StopToken`. + /// Must implement `FnMut(&mut St) -> StopToken`. pub stop_token: Sf, /// The function used as [`UpdateListener::hint_allowed_updates`]. @@ -68,41 +66,6 @@ impl StatefulListener { } } -impl - StatefulListener< - S, - for<'a> fn(&'a mut S) -> &'a mut S, - for<'a> fn(&'a mut S) -> stop_token::Noop, - Haufn, - Thfn, - > -where - S: Stream> + Unpin + Send + 'static, -{ - /// Creates a new update listener from a stream of updates which ignores - /// stop signals. - /// - /// It won't be possible to ever stop this listener with a stop token. - pub fn from_stream_without_graceful_shutdown(stream: S) -> Self { - let this = Self::new_with_hints( - stream, - |s| s, - |_| stop_token::Noop, - None, - Some(|_| { - // FIXME: replace this by just Duration::MAX once 1.53 releases - // be released - const NANOS_PER_SEC: u32 = 1_000_000_000; - let dmax = Duration::new(u64::MAX, NANOS_PER_SEC - 1); - - Some(dmax) - }), - ); - - assert_update_listener(this) - } -} - impl<'a, St, Assf, Sf, Hauf, Thf, Strm, E> AsUpdateStream<'a> for StatefulListener where @@ -119,18 +82,16 @@ where } } -impl UpdateListener for StatefulListener +impl UpdateListener for StatefulListener where Self: for<'a> AsUpdateStream<'a, StreamErr = E>, - Sf: FnMut(&mut St) -> Stt, - Stt: StopToken + Send, + Sf: FnMut(&mut St) -> StopToken, Hauf: FnMut(&mut St, &mut dyn Iterator), Thf: Fn(&St) -> Option, { type Err = E; - type StopToken = Stt; - fn stop_token(&mut self) -> Stt { + fn stop_token(&mut self) -> StopToken { (self.stop_token)(&mut self.state) } diff --git a/src/dispatching/update_listeners/webhooks/axum.rs b/src/dispatching/update_listeners/webhooks/axum.rs index 82669a0f..7bbe03ba 100644 --- a/src/dispatching/update_listeners/webhooks/axum.rs +++ b/src/dispatching/update_listeners/webhooks/axum.rs @@ -6,11 +6,9 @@ use axum::{ }; use crate::{ - dispatching::{ - stop_token::{AsyncStopFlag, StopToken}, - update_listeners::{webhooks::Options, UpdateListener}, - }, + dispatching::update_listeners::{webhooks::Options, UpdateListener}, requests::Requester, + stop::StopFlag, }; /// Webhook implementation based on the [mod@axum] framework. @@ -22,7 +20,7 @@ use crate::{ /// /// [`set_webhook`]: crate::payloads::SetWebhook /// [`delete_webhook`]: crate::payloads::DeleteWebhook -/// [`stop`]: StopToken::stop +/// [`stop`]: crate::stop::StopToken::stop /// /// ## Panics /// @@ -88,7 +86,7 @@ where /// /// [`set_webhook`]: crate::payloads::SetWebhook /// [`delete_webhook`]: crate::payloads::DeleteWebhook -/// [`stop`]: StopToken::stop +/// [`stop`]: crate::stop::StopToken::stop /// [`options.address`]: Options::address /// [`with_graceful_shutdown`]: axum::Server::with_graceful_shutdown /// @@ -156,10 +154,8 @@ pub fn axum_no_setup( options: Options, ) -> (impl UpdateListener, impl Future, axum::Router) { use crate::{ - dispatching::{ - stop_token::AsyncStopToken, - update_listeners::{self, webhooks::tuple_first_mut}, - }, + dispatching::update_listeners::{self, webhooks::tuple_first_mut}, + stop::{mk_stop_token, StopToken}, types::Update, }; use axum::{extract::Extension, response::IntoResponse, routing::post}; @@ -178,7 +174,7 @@ pub fn axum_no_setup( secret_header: XTelegramBotApiSecretToken, secret: Extension>, tx: Extension, - flag: Extension, + flag: Extension, ) -> impl IntoResponse { // FIXME: use constant time comparison here if secret_header.0.as_deref() != secret.as_deref().map(str::as_bytes) { @@ -214,7 +210,7 @@ pub fn axum_no_setup( StatusCode::OK } - let (stop_token, stop_flag) = AsyncStopToken::new_pair(); + let (stop_token, stop_flag) = mk_stop_token(); let app = axum::Router::new().route(options.url.path(), post(telegram_request)).layer( ServiceBuilder::new() @@ -231,7 +227,7 @@ pub fn axum_no_setup( let listener = update_listeners::StatefulListener::new( (stream, stop_token), tuple_first_mut, - |state: &mut (_, AsyncStopToken)| state.1.clone(), + |state: &mut (_, StopToken)| state.1.clone(), ); (listener, stop_flag, app) diff --git a/src/lib.rs b/src/lib.rs index 2964cd74..21ae0338 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -67,6 +67,7 @@ mod logging; pub mod dispatching; pub mod error_handlers; pub mod prelude; +pub mod stop; pub mod utils; #[doc(inline)] diff --git a/src/stop.rs b/src/stop.rs new file mode 100644 index 00000000..4fdfc478 --- /dev/null +++ b/src/stop.rs @@ -0,0 +1,67 @@ +//! This module contains stop [token] and stop [flag] that are used to stop +//! async tasks, for example [listeners]. +//! +//! [token]: StopToken +//! [flag]: StopFlag +//! [listeners]: crate::dispatching::update_listeners + +use std::{future::Future, pin::Pin, task}; + +use futures::future::{pending, AbortHandle, Abortable, Pending}; + +/// Create a new token/flag pair. +#[must_use] +pub fn mk_stop_token() -> (StopToken, StopFlag) { + let (handle, reg) = AbortHandle::new_pair(); + let token = StopToken(handle); + let flag = StopFlag(Abortable::new(pending(), reg)); + + (token, flag) +} + +/// A stop token which corresponds to a [`StopFlag`]. +#[derive(Clone)] +pub struct StopToken(AbortHandle); + +/// A flag which corresponds to [`StopToken`]. +/// +/// To know if the stop token was used you can either repeatedly call +/// [`is_stopped`] or use this type as a `Future`. +/// +/// [`is_stopped`]: StopFlag::is_stopped +#[pin_project::pin_project] +#[derive(Clone)] +pub struct StopFlag(#[pin] Abortable>); + +impl StopToken { + /// "Stops" the flag associated with this token. + /// + /// Note that calling this function multiple times does nothing, only the + /// first call changes the state. + pub fn stop(&self) { + self.0.abort() + } +} + +impl StopFlag { + /// Returns true if the stop token linked to `self` was used. + #[must_use] + pub fn is_stopped(&self) -> bool { + self.0.is_aborted() + } +} + +/// This future resolves when a stop token was used. +impl Future for StopFlag { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { + self.project().0.poll(cx).map(|res| { + debug_assert!( + res.is_err(), + "Pending Future can't ever be resolved, so Abortable is only resolved when \ + canceled" + ); + }) + } +} From 1fad4ab3b3c5adeae27817acac5f4425f36d953a Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Fri, 9 Sep 2022 21:39:39 +0400 Subject: [PATCH 2/2] Improve `StopFlag`'s implementation Former-commit-id: 0807eb57e15e2ea00b14392dde22e917e04d6a19 --- src/stop.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/stop.rs b/src/stop.rs index 4fdfc478..f61caf3d 100644 --- a/src/stop.rs +++ b/src/stop.rs @@ -5,7 +5,7 @@ //! [flag]: StopFlag //! [listeners]: crate::dispatching::update_listeners -use std::{future::Future, pin::Pin, task}; +use std::{convert::Infallible, future::Future, pin::Pin, task}; use futures::future::{pending, AbortHandle, Abortable, Pending}; @@ -31,7 +31,7 @@ pub struct StopToken(AbortHandle); /// [`is_stopped`]: StopFlag::is_stopped #[pin_project::pin_project] #[derive(Clone)] -pub struct StopFlag(#[pin] Abortable>); +pub struct StopFlag(#[pin] Abortable>); impl StopToken { /// "Stops" the flag associated with this token. @@ -56,12 +56,9 @@ impl Future for StopFlag { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { - self.project().0.poll(cx).map(|res| { - debug_assert!( - res.is_err(), - "Pending Future can't ever be resolved, so Abortable is only resolved when \ - canceled" - ); + self.project().0.poll(cx).map(|res| match res { + Err(_aborted) => (), + Ok(unreachable) => match unreachable {}, }) } }