mirror of
https://github.com/teloxide/teloxide.git
synced 2024-12-22 14:35:36 +01:00
Merge pull request #709 from teloxide/stop_refactor
Refactor `stop_token` module
Former-commit-id: 8272d4139a
This commit is contained in:
commit
2e3d678987
10 changed files with 94 additions and 153 deletions
|
@ -12,6 +12,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||
- Updated `teloxide-macros` see its [changelog](https://github.com/teloxide/teloxide-macros/blob/master/CHANGELOG.md#unreleased) for more
|
||||
- `UpdateListener` now has an associated type `Err` instead of a generic
|
||||
- `AsUpdateStream` now has an associated type `StreamErr` instead of a generic
|
||||
- Rename `dispatching::stop_token::{AsyncStopToken, AsyncStopFlag}` => `stop::{StopToken, StopFlag}`
|
||||
|
||||
### Removed
|
||||
|
||||
- `dispatching::stop_token::StopToken` trait (all uses are replaced with `stop::StopToken` structure)
|
||||
|
||||
## 0.10.1 - 2022-07-22
|
||||
|
||||
|
|
|
@ -233,7 +233,6 @@ mod filter_ext;
|
|||
mod handler_description;
|
||||
mod handler_ext;
|
||||
mod handler_factory;
|
||||
pub mod stop_token;
|
||||
pub mod update_listeners;
|
||||
|
||||
pub use crate::utils::shutdown_token::{IdleShutdownError, ShutdownToken};
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use crate::{
|
||||
dispatching::{
|
||||
distribution::default_distribution_function, stop_token::StopToken, update_listeners,
|
||||
distribution::default_distribution_function, update_listeners,
|
||||
update_listeners::UpdateListener, DefaultKey, DpHandlerDescription, ShutdownToken,
|
||||
},
|
||||
error_handlers::{ErrorHandler, LoggingErrorHandler},
|
||||
|
|
|
@ -1,79 +0,0 @@
|
|||
//! A stop token used to stop a listener.
|
||||
|
||||
use std::{future::Future, pin::Pin, task};
|
||||
|
||||
use futures::future::{pending, AbortHandle, Abortable, Pending};
|
||||
|
||||
/// A stop token allows you to stop a listener.
|
||||
///
|
||||
/// See also: [`UpdateListener::stop_token`].
|
||||
///
|
||||
/// [`UpdateListener::stop_token`]:
|
||||
/// crate::dispatching::update_listeners::UpdateListener::stop_token
|
||||
pub trait StopToken {
|
||||
/// Stop the listener linked to this token.
|
||||
fn stop(self);
|
||||
}
|
||||
|
||||
/// A stop token which does nothing. May be used in prototyping or in cases
|
||||
/// where you do not care about graceful shutdowning.
|
||||
pub struct Noop;
|
||||
|
||||
impl StopToken for Noop {
|
||||
fn stop(self) {}
|
||||
}
|
||||
|
||||
/// A stop token which corresponds to [`AsyncStopFlag`].
|
||||
#[derive(Clone)]
|
||||
pub struct AsyncStopToken(AbortHandle);
|
||||
|
||||
/// A flag which corresponds to [`AsyncStopToken`].
|
||||
///
|
||||
/// To know if the stop token was used you can either repeatedly call
|
||||
/// [`is_stopped`] or use this type as a `Future`.
|
||||
///
|
||||
/// [`is_stopped`]: AsyncStopFlag::is_stopped
|
||||
#[pin_project::pin_project]
|
||||
#[derive(Clone)]
|
||||
pub struct AsyncStopFlag(#[pin] Abortable<Pending<()>>);
|
||||
|
||||
impl AsyncStopToken {
|
||||
/// Create a new token/flag pair.
|
||||
#[must_use = "This function is pure, that is does nothing unless its output is used"]
|
||||
pub fn new_pair() -> (Self, AsyncStopFlag) {
|
||||
let (handle, reg) = AbortHandle::new_pair();
|
||||
let token = Self(handle);
|
||||
let flag = AsyncStopFlag(Abortable::new(pending(), reg));
|
||||
|
||||
(token, flag)
|
||||
}
|
||||
}
|
||||
|
||||
impl StopToken for AsyncStopToken {
|
||||
fn stop(self) {
|
||||
self.0.abort()
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncStopFlag {
|
||||
/// Returns true if the stop token linked to `self` was used.
|
||||
#[must_use = "This function is pure, that is does nothing unless its output is used"]
|
||||
pub fn is_stopped(&self) -> bool {
|
||||
self.0.is_aborted()
|
||||
}
|
||||
}
|
||||
|
||||
/// This future resolves when a stop token was used.
|
||||
impl Future for AsyncStopFlag {
|
||||
type Output = ();
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
|
||||
self.project().0.poll(cx).map(|res| {
|
||||
debug_assert!(
|
||||
res.is_err(),
|
||||
"Pending Future can't ever be resolved, so Abortable is only resolved when \
|
||||
canceled"
|
||||
);
|
||||
})
|
||||
}
|
||||
}
|
|
@ -35,7 +35,7 @@ use futures::Stream;
|
|||
use std::time::Duration;
|
||||
|
||||
use crate::{
|
||||
dispatching::stop_token::StopToken,
|
||||
stop::StopToken,
|
||||
types::{AllowedUpdate, Update},
|
||||
};
|
||||
|
||||
|
@ -65,9 +65,6 @@ pub trait UpdateListener:
|
|||
/// The type of errors that can be returned from this listener.
|
||||
type Err;
|
||||
|
||||
/// The type of token which allows to stop this listener.
|
||||
type StopToken: StopToken + Send;
|
||||
|
||||
/// Returns a token which stops this listener.
|
||||
///
|
||||
/// The [`stop`] function of the token is not guaranteed to have an
|
||||
|
@ -81,7 +78,7 @@ pub trait UpdateListener:
|
|||
/// soon as all cached updates are returned.
|
||||
#[must_use = "This function doesn't stop listening, to stop listening you need to call `stop` \
|
||||
on the returned token"]
|
||||
fn stop_token(&mut self) -> Self::StopToken;
|
||||
fn stop_token(&mut self) -> StopToken;
|
||||
|
||||
/// Hint which updates should the listener listen for.
|
||||
///
|
||||
|
|
|
@ -13,11 +13,9 @@ use std::{
|
|||
use futures::{ready, stream::Stream};
|
||||
|
||||
use crate::{
|
||||
dispatching::{
|
||||
stop_token::{AsyncStopFlag, AsyncStopToken},
|
||||
update_listeners::{assert_update_listener, AsUpdateStream, UpdateListener},
|
||||
},
|
||||
dispatching::update_listeners::{assert_update_listener, AsUpdateStream, UpdateListener},
|
||||
requests::{HasPayload, Request, Requester},
|
||||
stop::{mk_stop_token, StopFlag, StopToken},
|
||||
types::{AllowedUpdate, Update},
|
||||
};
|
||||
|
||||
|
@ -98,7 +96,7 @@ where
|
|||
/// See also: [`polling_default`], [`Polling`].
|
||||
pub fn build(self) -> Polling<R> {
|
||||
let Self { bot, timeout, limit, allowed_updates, drop_pending_updates } = self;
|
||||
let (token, flag) = AsyncStopToken::new_pair();
|
||||
let (token, flag) = mk_stop_token();
|
||||
let polling =
|
||||
Polling { bot, timeout, limit, allowed_updates, drop_pending_updates, flag, token };
|
||||
|
||||
|
@ -242,8 +240,8 @@ pub struct Polling<B: Requester> {
|
|||
limit: Option<u8>,
|
||||
allowed_updates: Option<Vec<AllowedUpdate>>,
|
||||
drop_pending_updates: bool,
|
||||
flag: AsyncStopFlag,
|
||||
token: AsyncStopToken,
|
||||
flag: StopFlag,
|
||||
token: StopToken,
|
||||
}
|
||||
|
||||
impl<R> Polling<R>
|
||||
|
@ -293,9 +291,8 @@ pub struct PollingStream<'a, B: Requester> {
|
|||
|
||||
impl<B: Requester + Send + 'static> UpdateListener for Polling<B> {
|
||||
type Err = B::Err;
|
||||
type StopToken = AsyncStopToken;
|
||||
|
||||
fn stop_token(&mut self) -> Self::StopToken {
|
||||
fn stop_token(&mut self) -> StopToken {
|
||||
self.token.clone()
|
||||
}
|
||||
|
||||
|
|
|
@ -3,10 +3,8 @@ use std::time::Duration;
|
|||
use futures::Stream;
|
||||
|
||||
use crate::{
|
||||
dispatching::{
|
||||
stop_token::{self, StopToken},
|
||||
update_listeners::{assert_update_listener, AsUpdateStream, UpdateListener},
|
||||
},
|
||||
dispatching::update_listeners::{AsUpdateStream, UpdateListener},
|
||||
stop::StopToken,
|
||||
types::{AllowedUpdate, Update},
|
||||
};
|
||||
|
||||
|
@ -30,7 +28,7 @@ pub struct StatefulListener<St, Assf, Sf, Hauf, Thf> {
|
|||
|
||||
/// The function used as [`UpdateListener::stop_token`].
|
||||
///
|
||||
/// Must implement `FnMut(&mut St) -> impl StopToken`.
|
||||
/// Must implement `FnMut(&mut St) -> StopToken`.
|
||||
pub stop_token: Sf,
|
||||
|
||||
/// The function used as [`UpdateListener::hint_allowed_updates`].
|
||||
|
@ -68,41 +66,6 @@ impl<St, Assf, Sf, Hauf, Thf> StatefulListener<St, Assf, Sf, Hauf, Thf> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<S, E>
|
||||
StatefulListener<
|
||||
S,
|
||||
for<'a> fn(&'a mut S) -> &'a mut S,
|
||||
for<'a> fn(&'a mut S) -> stop_token::Noop,
|
||||
Haufn<S>,
|
||||
Thfn<S>,
|
||||
>
|
||||
where
|
||||
S: Stream<Item = Result<Update, E>> + Unpin + Send + 'static,
|
||||
{
|
||||
/// Creates a new update listener from a stream of updates which ignores
|
||||
/// stop signals.
|
||||
///
|
||||
/// It won't be possible to ever stop this listener with a stop token.
|
||||
pub fn from_stream_without_graceful_shutdown(stream: S) -> Self {
|
||||
let this = Self::new_with_hints(
|
||||
stream,
|
||||
|s| s,
|
||||
|_| stop_token::Noop,
|
||||
None,
|
||||
Some(|_| {
|
||||
// FIXME: replace this by just Duration::MAX once 1.53 releases
|
||||
// be released
|
||||
const NANOS_PER_SEC: u32 = 1_000_000_000;
|
||||
let dmax = Duration::new(u64::MAX, NANOS_PER_SEC - 1);
|
||||
|
||||
Some(dmax)
|
||||
}),
|
||||
);
|
||||
|
||||
assert_update_listener(this)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, St, Assf, Sf, Hauf, Thf, Strm, E> AsUpdateStream<'a>
|
||||
for StatefulListener<St, Assf, Hauf, Sf, Thf>
|
||||
where
|
||||
|
@ -119,18 +82,16 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<St, Assf, Sf, Hauf, Stt, Thf, E> UpdateListener for StatefulListener<St, Assf, Sf, Hauf, Thf>
|
||||
impl<St, Assf, Sf, Hauf, Thf, E> UpdateListener for StatefulListener<St, Assf, Sf, Hauf, Thf>
|
||||
where
|
||||
Self: for<'a> AsUpdateStream<'a, StreamErr = E>,
|
||||
Sf: FnMut(&mut St) -> Stt,
|
||||
Stt: StopToken + Send,
|
||||
Sf: FnMut(&mut St) -> StopToken,
|
||||
Hauf: FnMut(&mut St, &mut dyn Iterator<Item = AllowedUpdate>),
|
||||
Thf: Fn(&St) -> Option<Duration>,
|
||||
{
|
||||
type Err = E;
|
||||
type StopToken = Stt;
|
||||
|
||||
fn stop_token(&mut self) -> Stt {
|
||||
fn stop_token(&mut self) -> StopToken {
|
||||
(self.stop_token)(&mut self.state)
|
||||
}
|
||||
|
||||
|
|
|
@ -6,11 +6,9 @@ use axum::{
|
|||
};
|
||||
|
||||
use crate::{
|
||||
dispatching::{
|
||||
stop_token::{AsyncStopFlag, StopToken},
|
||||
update_listeners::{webhooks::Options, UpdateListener},
|
||||
},
|
||||
dispatching::update_listeners::{webhooks::Options, UpdateListener},
|
||||
requests::Requester,
|
||||
stop::StopFlag,
|
||||
};
|
||||
|
||||
/// Webhook implementation based on the [mod@axum] framework.
|
||||
|
@ -22,7 +20,7 @@ use crate::{
|
|||
///
|
||||
/// [`set_webhook`]: crate::payloads::SetWebhook
|
||||
/// [`delete_webhook`]: crate::payloads::DeleteWebhook
|
||||
/// [`stop`]: StopToken::stop
|
||||
/// [`stop`]: crate::stop::StopToken::stop
|
||||
///
|
||||
/// ## Panics
|
||||
///
|
||||
|
@ -88,7 +86,7 @@ where
|
|||
///
|
||||
/// [`set_webhook`]: crate::payloads::SetWebhook
|
||||
/// [`delete_webhook`]: crate::payloads::DeleteWebhook
|
||||
/// [`stop`]: StopToken::stop
|
||||
/// [`stop`]: crate::stop::StopToken::stop
|
||||
/// [`options.address`]: Options::address
|
||||
/// [`with_graceful_shutdown`]: axum::Server::with_graceful_shutdown
|
||||
///
|
||||
|
@ -156,10 +154,8 @@ pub fn axum_no_setup(
|
|||
options: Options,
|
||||
) -> (impl UpdateListener<Err = Infallible>, impl Future<Output = ()>, axum::Router) {
|
||||
use crate::{
|
||||
dispatching::{
|
||||
stop_token::AsyncStopToken,
|
||||
update_listeners::{self, webhooks::tuple_first_mut},
|
||||
},
|
||||
dispatching::update_listeners::{self, webhooks::tuple_first_mut},
|
||||
stop::{mk_stop_token, StopToken},
|
||||
types::Update,
|
||||
};
|
||||
use axum::{extract::Extension, response::IntoResponse, routing::post};
|
||||
|
@ -178,7 +174,7 @@ pub fn axum_no_setup(
|
|||
secret_header: XTelegramBotApiSecretToken,
|
||||
secret: Extension<Option<String>>,
|
||||
tx: Extension<CSender>,
|
||||
flag: Extension<AsyncStopFlag>,
|
||||
flag: Extension<StopFlag>,
|
||||
) -> impl IntoResponse {
|
||||
// FIXME: use constant time comparison here
|
||||
if secret_header.0.as_deref() != secret.as_deref().map(str::as_bytes) {
|
||||
|
@ -214,7 +210,7 @@ pub fn axum_no_setup(
|
|||
StatusCode::OK
|
||||
}
|
||||
|
||||
let (stop_token, stop_flag) = AsyncStopToken::new_pair();
|
||||
let (stop_token, stop_flag) = mk_stop_token();
|
||||
|
||||
let app = axum::Router::new().route(options.url.path(), post(telegram_request)).layer(
|
||||
ServiceBuilder::new()
|
||||
|
@ -231,7 +227,7 @@ pub fn axum_no_setup(
|
|||
let listener = update_listeners::StatefulListener::new(
|
||||
(stream, stop_token),
|
||||
tuple_first_mut,
|
||||
|state: &mut (_, AsyncStopToken)| state.1.clone(),
|
||||
|state: &mut (_, StopToken)| state.1.clone(),
|
||||
);
|
||||
|
||||
(listener, stop_flag, app)
|
||||
|
|
|
@ -67,6 +67,7 @@ mod logging;
|
|||
pub mod dispatching;
|
||||
pub mod error_handlers;
|
||||
pub mod prelude;
|
||||
pub mod stop;
|
||||
pub mod utils;
|
||||
|
||||
#[doc(inline)]
|
||||
|
|
64
src/stop.rs
Normal file
64
src/stop.rs
Normal file
|
@ -0,0 +1,64 @@
|
|||
//! This module contains stop [token] and stop [flag] that are used to stop
|
||||
//! async tasks, for example [listeners].
|
||||
//!
|
||||
//! [token]: StopToken
|
||||
//! [flag]: StopFlag
|
||||
//! [listeners]: crate::dispatching::update_listeners
|
||||
|
||||
use std::{convert::Infallible, future::Future, pin::Pin, task};
|
||||
|
||||
use futures::future::{pending, AbortHandle, Abortable, Pending};
|
||||
|
||||
/// Create a new token/flag pair.
|
||||
#[must_use]
|
||||
pub fn mk_stop_token() -> (StopToken, StopFlag) {
|
||||
let (handle, reg) = AbortHandle::new_pair();
|
||||
let token = StopToken(handle);
|
||||
let flag = StopFlag(Abortable::new(pending(), reg));
|
||||
|
||||
(token, flag)
|
||||
}
|
||||
|
||||
/// A stop token which corresponds to a [`StopFlag`].
|
||||
#[derive(Clone)]
|
||||
pub struct StopToken(AbortHandle);
|
||||
|
||||
/// A flag which corresponds to [`StopToken`].
|
||||
///
|
||||
/// To know if the stop token was used you can either repeatedly call
|
||||
/// [`is_stopped`] or use this type as a `Future`.
|
||||
///
|
||||
/// [`is_stopped`]: StopFlag::is_stopped
|
||||
#[pin_project::pin_project]
|
||||
#[derive(Clone)]
|
||||
pub struct StopFlag(#[pin] Abortable<Pending<Infallible>>);
|
||||
|
||||
impl StopToken {
|
||||
/// "Stops" the flag associated with this token.
|
||||
///
|
||||
/// Note that calling this function multiple times does nothing, only the
|
||||
/// first call changes the state.
|
||||
pub fn stop(&self) {
|
||||
self.0.abort()
|
||||
}
|
||||
}
|
||||
|
||||
impl StopFlag {
|
||||
/// Returns true if the stop token linked to `self` was used.
|
||||
#[must_use]
|
||||
pub fn is_stopped(&self) -> bool {
|
||||
self.0.is_aborted()
|
||||
}
|
||||
}
|
||||
|
||||
/// This future resolves when a stop token was used.
|
||||
impl Future for StopFlag {
|
||||
type Output = ();
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
|
||||
self.project().0.poll(cx).map(|res| match res {
|
||||
Err(_aborted) => (),
|
||||
Ok(unreachable) => match unreachable {},
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue