Merge pull request #709 from teloxide/stop_refactor

Refactor `stop_token` module
This commit is contained in:
Waffle Maybe 2022-09-25 05:24:06 +04:00 committed by GitHub
commit 8272d4139a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 94 additions and 153 deletions

View file

@ -12,6 +12,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Updated `teloxide-macros` see its [changelog](https://github.com/teloxide/teloxide-macros/blob/master/CHANGELOG.md#unreleased) for more - Updated `teloxide-macros` see its [changelog](https://github.com/teloxide/teloxide-macros/blob/master/CHANGELOG.md#unreleased) for more
- `UpdateListener` now has an associated type `Err` instead of a generic - `UpdateListener` now has an associated type `Err` instead of a generic
- `AsUpdateStream` now has an associated type `StreamErr` instead of a generic - `AsUpdateStream` now has an associated type `StreamErr` instead of a generic
- Rename `dispatching::stop_token::{AsyncStopToken, AsyncStopFlag}` => `stop::{StopToken, StopFlag}`
### Removed
- `dispatching::stop_token::StopToken` trait (all uses are replaced with `stop::StopToken` structure)
## 0.10.1 - 2022-07-22 ## 0.10.1 - 2022-07-22

View file

@ -233,7 +233,6 @@ mod filter_ext;
mod handler_description; mod handler_description;
mod handler_ext; mod handler_ext;
mod handler_factory; mod handler_factory;
pub mod stop_token;
pub mod update_listeners; pub mod update_listeners;
pub use crate::utils::shutdown_token::{IdleShutdownError, ShutdownToken}; pub use crate::utils::shutdown_token::{IdleShutdownError, ShutdownToken};

View file

@ -1,6 +1,6 @@
use crate::{ use crate::{
dispatching::{ dispatching::{
distribution::default_distribution_function, stop_token::StopToken, update_listeners, distribution::default_distribution_function, update_listeners,
update_listeners::UpdateListener, DefaultKey, DpHandlerDescription, ShutdownToken, update_listeners::UpdateListener, DefaultKey, DpHandlerDescription, ShutdownToken,
}, },
error_handlers::{ErrorHandler, LoggingErrorHandler}, error_handlers::{ErrorHandler, LoggingErrorHandler},

View file

@ -1,79 +0,0 @@
//! A stop token used to stop a listener.
use std::{future::Future, pin::Pin, task};
use futures::future::{pending, AbortHandle, Abortable, Pending};
/// A stop token allows you to stop a listener.
///
/// See also: [`UpdateListener::stop_token`].
///
/// [`UpdateListener::stop_token`]:
/// crate::dispatching::update_listeners::UpdateListener::stop_token
pub trait StopToken {
/// Stop the listener linked to this token.
fn stop(self);
}
/// A stop token which does nothing. May be used in prototyping or in cases
/// where you do not care about graceful shutdowning.
pub struct Noop;
impl StopToken for Noop {
fn stop(self) {}
}
/// A stop token which corresponds to [`AsyncStopFlag`].
#[derive(Clone)]
pub struct AsyncStopToken(AbortHandle);
/// A flag which corresponds to [`AsyncStopToken`].
///
/// To know if the stop token was used you can either repeatedly call
/// [`is_stopped`] or use this type as a `Future`.
///
/// [`is_stopped`]: AsyncStopFlag::is_stopped
#[pin_project::pin_project]
#[derive(Clone)]
pub struct AsyncStopFlag(#[pin] Abortable<Pending<()>>);
impl AsyncStopToken {
/// Create a new token/flag pair.
#[must_use = "This function is pure, that is does nothing unless its output is used"]
pub fn new_pair() -> (Self, AsyncStopFlag) {
let (handle, reg) = AbortHandle::new_pair();
let token = Self(handle);
let flag = AsyncStopFlag(Abortable::new(pending(), reg));
(token, flag)
}
}
impl StopToken for AsyncStopToken {
fn stop(self) {
self.0.abort()
}
}
impl AsyncStopFlag {
/// Returns true if the stop token linked to `self` was used.
#[must_use = "This function is pure, that is does nothing unless its output is used"]
pub fn is_stopped(&self) -> bool {
self.0.is_aborted()
}
}
/// This future resolves when a stop token was used.
impl Future for AsyncStopFlag {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
self.project().0.poll(cx).map(|res| {
debug_assert!(
res.is_err(),
"Pending Future can't ever be resolved, so Abortable is only resolved when \
canceled"
);
})
}
}

View file

@ -35,7 +35,7 @@ use futures::Stream;
use std::time::Duration; use std::time::Duration;
use crate::{ use crate::{
dispatching::stop_token::StopToken, stop::StopToken,
types::{AllowedUpdate, Update}, types::{AllowedUpdate, Update},
}; };
@ -65,9 +65,6 @@ pub trait UpdateListener:
/// The type of errors that can be returned from this listener. /// The type of errors that can be returned from this listener.
type Err; type Err;
/// The type of token which allows to stop this listener.
type StopToken: StopToken + Send;
/// Returns a token which stops this listener. /// Returns a token which stops this listener.
/// ///
/// The [`stop`] function of the token is not guaranteed to have an /// The [`stop`] function of the token is not guaranteed to have an
@ -81,7 +78,7 @@ pub trait UpdateListener:
/// soon as all cached updates are returned. /// soon as all cached updates are returned.
#[must_use = "This function doesn't stop listening, to stop listening you need to call `stop` \ #[must_use = "This function doesn't stop listening, to stop listening you need to call `stop` \
on the returned token"] on the returned token"]
fn stop_token(&mut self) -> Self::StopToken; fn stop_token(&mut self) -> StopToken;
/// Hint which updates should the listener listen for. /// Hint which updates should the listener listen for.
/// ///

View file

@ -13,11 +13,9 @@ use std::{
use futures::{ready, stream::Stream}; use futures::{ready, stream::Stream};
use crate::{ use crate::{
dispatching::{ dispatching::update_listeners::{assert_update_listener, AsUpdateStream, UpdateListener},
stop_token::{AsyncStopFlag, AsyncStopToken},
update_listeners::{assert_update_listener, AsUpdateStream, UpdateListener},
},
requests::{HasPayload, Request, Requester}, requests::{HasPayload, Request, Requester},
stop::{mk_stop_token, StopFlag, StopToken},
types::{AllowedUpdate, Update}, types::{AllowedUpdate, Update},
}; };
@ -98,7 +96,7 @@ where
/// See also: [`polling_default`], [`Polling`]. /// See also: [`polling_default`], [`Polling`].
pub fn build(self) -> Polling<R> { pub fn build(self) -> Polling<R> {
let Self { bot, timeout, limit, allowed_updates, drop_pending_updates } = self; let Self { bot, timeout, limit, allowed_updates, drop_pending_updates } = self;
let (token, flag) = AsyncStopToken::new_pair(); let (token, flag) = mk_stop_token();
let polling = let polling =
Polling { bot, timeout, limit, allowed_updates, drop_pending_updates, flag, token }; Polling { bot, timeout, limit, allowed_updates, drop_pending_updates, flag, token };
@ -242,8 +240,8 @@ pub struct Polling<B: Requester> {
limit: Option<u8>, limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>, allowed_updates: Option<Vec<AllowedUpdate>>,
drop_pending_updates: bool, drop_pending_updates: bool,
flag: AsyncStopFlag, flag: StopFlag,
token: AsyncStopToken, token: StopToken,
} }
impl<R> Polling<R> impl<R> Polling<R>
@ -293,9 +291,8 @@ pub struct PollingStream<'a, B: Requester> {
impl<B: Requester + Send + 'static> UpdateListener for Polling<B> { impl<B: Requester + Send + 'static> UpdateListener for Polling<B> {
type Err = B::Err; type Err = B::Err;
type StopToken = AsyncStopToken;
fn stop_token(&mut self) -> Self::StopToken { fn stop_token(&mut self) -> StopToken {
self.token.clone() self.token.clone()
} }

View file

@ -3,10 +3,8 @@ use std::time::Duration;
use futures::Stream; use futures::Stream;
use crate::{ use crate::{
dispatching::{ dispatching::update_listeners::{AsUpdateStream, UpdateListener},
stop_token::{self, StopToken}, stop::StopToken,
update_listeners::{assert_update_listener, AsUpdateStream, UpdateListener},
},
types::{AllowedUpdate, Update}, types::{AllowedUpdate, Update},
}; };
@ -30,7 +28,7 @@ pub struct StatefulListener<St, Assf, Sf, Hauf, Thf> {
/// The function used as [`UpdateListener::stop_token`]. /// The function used as [`UpdateListener::stop_token`].
/// ///
/// Must implement `FnMut(&mut St) -> impl StopToken`. /// Must implement `FnMut(&mut St) -> StopToken`.
pub stop_token: Sf, pub stop_token: Sf,
/// The function used as [`UpdateListener::hint_allowed_updates`]. /// The function used as [`UpdateListener::hint_allowed_updates`].
@ -68,41 +66,6 @@ impl<St, Assf, Sf, Hauf, Thf> StatefulListener<St, Assf, Sf, Hauf, Thf> {
} }
} }
impl<S, E>
StatefulListener<
S,
for<'a> fn(&'a mut S) -> &'a mut S,
for<'a> fn(&'a mut S) -> stop_token::Noop,
Haufn<S>,
Thfn<S>,
>
where
S: Stream<Item = Result<Update, E>> + Unpin + Send + 'static,
{
/// Creates a new update listener from a stream of updates which ignores
/// stop signals.
///
/// It won't be possible to ever stop this listener with a stop token.
pub fn from_stream_without_graceful_shutdown(stream: S) -> Self {
let this = Self::new_with_hints(
stream,
|s| s,
|_| stop_token::Noop,
None,
Some(|_| {
// FIXME: replace this by just Duration::MAX once 1.53 releases
// be released
const NANOS_PER_SEC: u32 = 1_000_000_000;
let dmax = Duration::new(u64::MAX, NANOS_PER_SEC - 1);
Some(dmax)
}),
);
assert_update_listener(this)
}
}
impl<'a, St, Assf, Sf, Hauf, Thf, Strm, E> AsUpdateStream<'a> impl<'a, St, Assf, Sf, Hauf, Thf, Strm, E> AsUpdateStream<'a>
for StatefulListener<St, Assf, Hauf, Sf, Thf> for StatefulListener<St, Assf, Hauf, Sf, Thf>
where where
@ -119,18 +82,16 @@ where
} }
} }
impl<St, Assf, Sf, Hauf, Stt, Thf, E> UpdateListener for StatefulListener<St, Assf, Sf, Hauf, Thf> impl<St, Assf, Sf, Hauf, Thf, E> UpdateListener for StatefulListener<St, Assf, Sf, Hauf, Thf>
where where
Self: for<'a> AsUpdateStream<'a, StreamErr = E>, Self: for<'a> AsUpdateStream<'a, StreamErr = E>,
Sf: FnMut(&mut St) -> Stt, Sf: FnMut(&mut St) -> StopToken,
Stt: StopToken + Send,
Hauf: FnMut(&mut St, &mut dyn Iterator<Item = AllowedUpdate>), Hauf: FnMut(&mut St, &mut dyn Iterator<Item = AllowedUpdate>),
Thf: Fn(&St) -> Option<Duration>, Thf: Fn(&St) -> Option<Duration>,
{ {
type Err = E; type Err = E;
type StopToken = Stt;
fn stop_token(&mut self) -> Stt { fn stop_token(&mut self) -> StopToken {
(self.stop_token)(&mut self.state) (self.stop_token)(&mut self.state)
} }

View file

@ -6,11 +6,9 @@ use axum::{
}; };
use crate::{ use crate::{
dispatching::{ dispatching::update_listeners::{webhooks::Options, UpdateListener},
stop_token::{AsyncStopFlag, StopToken},
update_listeners::{webhooks::Options, UpdateListener},
},
requests::Requester, requests::Requester,
stop::StopFlag,
}; };
/// Webhook implementation based on the [mod@axum] framework. /// Webhook implementation based on the [mod@axum] framework.
@ -22,7 +20,7 @@ use crate::{
/// ///
/// [`set_webhook`]: crate::payloads::SetWebhook /// [`set_webhook`]: crate::payloads::SetWebhook
/// [`delete_webhook`]: crate::payloads::DeleteWebhook /// [`delete_webhook`]: crate::payloads::DeleteWebhook
/// [`stop`]: StopToken::stop /// [`stop`]: crate::stop::StopToken::stop
/// ///
/// ## Panics /// ## Panics
/// ///
@ -88,7 +86,7 @@ where
/// ///
/// [`set_webhook`]: crate::payloads::SetWebhook /// [`set_webhook`]: crate::payloads::SetWebhook
/// [`delete_webhook`]: crate::payloads::DeleteWebhook /// [`delete_webhook`]: crate::payloads::DeleteWebhook
/// [`stop`]: StopToken::stop /// [`stop`]: crate::stop::StopToken::stop
/// [`options.address`]: Options::address /// [`options.address`]: Options::address
/// [`with_graceful_shutdown`]: axum::Server::with_graceful_shutdown /// [`with_graceful_shutdown`]: axum::Server::with_graceful_shutdown
/// ///
@ -156,10 +154,8 @@ pub fn axum_no_setup(
options: Options, options: Options,
) -> (impl UpdateListener<Err = Infallible>, impl Future<Output = ()>, axum::Router) { ) -> (impl UpdateListener<Err = Infallible>, impl Future<Output = ()>, axum::Router) {
use crate::{ use crate::{
dispatching::{ dispatching::update_listeners::{self, webhooks::tuple_first_mut},
stop_token::AsyncStopToken, stop::{mk_stop_token, StopToken},
update_listeners::{self, webhooks::tuple_first_mut},
},
types::Update, types::Update,
}; };
use axum::{extract::Extension, response::IntoResponse, routing::post}; use axum::{extract::Extension, response::IntoResponse, routing::post};
@ -178,7 +174,7 @@ pub fn axum_no_setup(
secret_header: XTelegramBotApiSecretToken, secret_header: XTelegramBotApiSecretToken,
secret: Extension<Option<String>>, secret: Extension<Option<String>>,
tx: Extension<CSender>, tx: Extension<CSender>,
flag: Extension<AsyncStopFlag>, flag: Extension<StopFlag>,
) -> impl IntoResponse { ) -> impl IntoResponse {
// FIXME: use constant time comparison here // FIXME: use constant time comparison here
if secret_header.0.as_deref() != secret.as_deref().map(str::as_bytes) { if secret_header.0.as_deref() != secret.as_deref().map(str::as_bytes) {
@ -214,7 +210,7 @@ pub fn axum_no_setup(
StatusCode::OK StatusCode::OK
} }
let (stop_token, stop_flag) = AsyncStopToken::new_pair(); let (stop_token, stop_flag) = mk_stop_token();
let app = axum::Router::new().route(options.url.path(), post(telegram_request)).layer( let app = axum::Router::new().route(options.url.path(), post(telegram_request)).layer(
ServiceBuilder::new() ServiceBuilder::new()
@ -231,7 +227,7 @@ pub fn axum_no_setup(
let listener = update_listeners::StatefulListener::new( let listener = update_listeners::StatefulListener::new(
(stream, stop_token), (stream, stop_token),
tuple_first_mut, tuple_first_mut,
|state: &mut (_, AsyncStopToken)| state.1.clone(), |state: &mut (_, StopToken)| state.1.clone(),
); );
(listener, stop_flag, app) (listener, stop_flag, app)

View file

@ -67,6 +67,7 @@ mod logging;
pub mod dispatching; pub mod dispatching;
pub mod error_handlers; pub mod error_handlers;
pub mod prelude; pub mod prelude;
pub mod stop;
pub mod utils; pub mod utils;
#[doc(inline)] #[doc(inline)]

64
src/stop.rs Normal file
View file

@ -0,0 +1,64 @@
//! This module contains stop [token] and stop [flag] that are used to stop
//! async tasks, for example [listeners].
//!
//! [token]: StopToken
//! [flag]: StopFlag
//! [listeners]: crate::dispatching::update_listeners
use std::{convert::Infallible, future::Future, pin::Pin, task};
use futures::future::{pending, AbortHandle, Abortable, Pending};
/// Create a new token/flag pair.
#[must_use]
pub fn mk_stop_token() -> (StopToken, StopFlag) {
let (handle, reg) = AbortHandle::new_pair();
let token = StopToken(handle);
let flag = StopFlag(Abortable::new(pending(), reg));
(token, flag)
}
/// A stop token which corresponds to a [`StopFlag`].
#[derive(Clone)]
pub struct StopToken(AbortHandle);
/// A flag which corresponds to [`StopToken`].
///
/// To know if the stop token was used you can either repeatedly call
/// [`is_stopped`] or use this type as a `Future`.
///
/// [`is_stopped`]: StopFlag::is_stopped
#[pin_project::pin_project]
#[derive(Clone)]
pub struct StopFlag(#[pin] Abortable<Pending<Infallible>>);
impl StopToken {
/// "Stops" the flag associated with this token.
///
/// Note that calling this function multiple times does nothing, only the
/// first call changes the state.
pub fn stop(&self) {
self.0.abort()
}
}
impl StopFlag {
/// Returns true if the stop token linked to `self` was used.
#[must_use]
pub fn is_stopped(&self) -> bool {
self.0.is_aborted()
}
}
/// This future resolves when a stop token was used.
impl Future for StopFlag {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
self.project().0.poll(cx).map(|res| match res {
Err(_aborted) => (),
Ok(unreachable) => match unreachable {},
})
}
}