From 8917e05bf8f450696f622e1b06be157641136be4 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 5 Sep 2022 18:52:24 +0400 Subject: [PATCH 1/2] Turn `UpdateListener`'s generic error into an associated type --- CHANGELOG.md | 5 +++++ src/dispatching/dispatcher.rs | 8 +++---- src/dispatching/repls/commands_repl.rs | 6 ++--- src/dispatching/repls/repl.rs | 6 ++--- src/dispatching/update_listeners.rs | 22 ++++++++++++++----- src/dispatching/update_listeners/polling.rs | 6 +++-- .../update_listeners/stateful_listener.rs | 18 +++++---------- .../update_listeners/webhooks/axum.rs | 12 +++++++--- src/utils/shutdown_token.rs | 2 +- 9 files changed, 52 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a82450e..0c21257f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## unreleased +### Changed + +- `UpdateListener` now has an associated type `Err` instead of a generic +- `AsUpdateStream` now has an associated type `StreamErr` instead of a generic + ## 0.10.1 - 2022-07-22 ### Fixed diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 9e009401..42ae20bd 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -287,14 +287,14 @@ where /// This method adds the same dependencies as [`Dispatcher::dispatch`]. /// /// [`shutdown`]: ShutdownToken::shutdown - pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>( + pub async fn dispatch_with_listener<'a, UListener, Eh>( &'a mut self, mut update_listener: UListener, update_listener_error_handler: Arc, ) where - UListener: UpdateListener + 'a, - Eh: ErrorHandler + 'a, - ListenerE: Debug, + UListener: UpdateListener + 'a, + Eh: ErrorHandler + 'a, + UListener::Err: Debug, { // FIXME: there should be a way to check if dependency is already inserted let me = self.bot.get_me().send().await.expect("Failed to retrieve 'me'"); diff --git a/src/dispatching/repls/commands_repl.rs b/src/dispatching/repls/commands_repl.rs index fb89b21a..ceeeb917 100644 --- a/src/dispatching/repls/commands_repl.rs +++ b/src/dispatching/repls/commands_repl.rs @@ -78,7 +78,7 @@ where /// [`commands_repl`]: crate::dispatching::repls::commands_repl() /// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener #[cfg(feature = "ctrlc_handler")] -pub async fn commands_repl_with_listener<'a, R, Cmd, H, L, ListenerE, E, Args>( +pub async fn commands_repl_with_listener<'a, R, Cmd, H, L, E, Args>( bot: R, handler: H, listener: L, @@ -86,8 +86,8 @@ pub async fn commands_repl_with_listener<'a, R, Cmd, H, L, ListenerE, E, Args>( ) where Cmd: BotCommands + Send + Sync + 'static, H: Injectable, Args> + Send + Sync + 'static, - L: UpdateListener + Send + 'a, - ListenerE: Debug + Send + 'a, + L: UpdateListener + Send + 'a, + L::Err: Debug + Send + 'a, R: Requester + Clone + Send + Sync + 'static, E: Debug + Send + Sync + 'static, { diff --git a/src/dispatching/repls/repl.rs b/src/dispatching/repls/repl.rs index 8ae7cbba..529cc711 100644 --- a/src/dispatching/repls/repl.rs +++ b/src/dispatching/repls/repl.rs @@ -60,11 +60,11 @@ where /// [`repl`]: crate::dispatching::repls::repl() /// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener #[cfg(feature = "ctrlc_handler")] -pub async fn repl_with_listener<'a, R, H, E, L, ListenerE, Args>(bot: R, handler: H, listener: L) +pub async fn repl_with_listener<'a, R, H, E, L, Args>(bot: R, handler: H, listener: L) where H: Injectable, Args> + Send + Sync + 'static, - L: UpdateListener + Send + 'a, - ListenerE: Debug, + L: UpdateListener + Send + 'a, + L::Err: Debug, Result<(), E>: OnError, E: Debug + Send + Sync + 'static, R: Requester + Clone + Send + Sync + 'static, diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index 9c4abc2f..f7153ee1 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -59,7 +59,12 @@ pub use self::{ /// - [`AsUpdateStream::as_stream`] /// /// [module-level documentation]: mod@self -pub trait UpdateListener: for<'a> AsUpdateStream<'a, E> { +pub trait UpdateListener: + for<'a> AsUpdateStream<'a, StreamErr = ::Err> +{ + /// The type of errors that can be returned from this listener. + type Err; + /// The type of token which allows to stop this listener. type StopToken: StopToken + Send; @@ -110,7 +115,14 @@ pub trait UpdateListener: for<'a> AsUpdateStream<'a, E> { /// [`UpdateListener`]'s supertrait/extension. /// /// This trait is a workaround to not require GAT. -pub trait AsUpdateStream<'a, E> { +pub trait AsUpdateStream<'a> { + /// Error that can be returned from the [`Stream`] + /// + /// [`Stream`]: AsUpdateStream::Stream + // NB: This should be named differently to `UpdateListener::Err`, so that it's + // unambiguous + type StreamErr; + /// The stream of updates from Telegram. // HACK: There is currently no way to write something like // `-> impl for<'a> AsUpdateStream<'a, E, Stream: Send>`. Since we return @@ -119,7 +131,7 @@ pub trait AsUpdateStream<'a, E> { // // Without this it's, for example, impossible to spawn a tokio task with // teloxide polling. - type Stream: Stream> + Send + 'a; + type Stream: Stream> + Send + 'a; /// Creates the update [`Stream`]. /// @@ -128,9 +140,9 @@ pub trait AsUpdateStream<'a, E> { } #[inline(always)] -pub(crate) fn assert_update_listener(listener: L) -> L +pub(crate) fn assert_update_listener(listener: L) -> L where - L: UpdateListener, + L: UpdateListener, { listener } diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index 118fc02e..74824bba 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -291,7 +291,8 @@ pub struct PollingStream<'a, B: Requester> { in_flight: Option<::Send>, } -impl UpdateListener for Polling { +impl UpdateListener for Polling { + type Err = B::Err; type StopToken = AsyncStopToken; fn stop_token(&mut self) -> Self::StopToken { @@ -309,7 +310,8 @@ impl UpdateListener for Polling { } } -impl<'a, B: Requester + Send + 'a> AsUpdateStream<'a, B::Err> for Polling { +impl<'a, B: Requester + Send + 'a> AsUpdateStream<'a> for Polling { + type StreamErr = B::Err; type Stream = PollingStream<'a, B>; fn as_stream(&'a mut self) -> Self::Stream { diff --git a/src/dispatching/update_listeners/stateful_listener.rs b/src/dispatching/update_listeners/stateful_listener.rs index e37a9efc..7f23d795 100644 --- a/src/dispatching/update_listeners/stateful_listener.rs +++ b/src/dispatching/update_listeners/stateful_listener.rs @@ -5,7 +5,7 @@ use futures::Stream; use crate::{ dispatching::{ stop_token::{self, StopToken}, - update_listeners::{AsUpdateStream, UpdateListener}, + update_listeners::{assert_update_listener, AsUpdateStream, UpdateListener}, }, types::{AllowedUpdate, Update}, }; @@ -103,7 +103,7 @@ where } } -impl<'a, St, Assf, Sf, Hauf, Thf, Strm, E> AsUpdateStream<'a, E> +impl<'a, St, Assf, Sf, Hauf, Thf, Strm, E> AsUpdateStream<'a> for StatefulListener where (St, Strm): 'a, @@ -111,6 +111,7 @@ where Assf: FnMut(&'a mut St) -> Strm, Strm: Stream>, { + type StreamErr = E; type Stream = Strm; fn as_stream(&'a mut self) -> Self::Stream { @@ -118,15 +119,15 @@ where } } -impl UpdateListener - for StatefulListener +impl UpdateListener for StatefulListener where - Self: for<'a> AsUpdateStream<'a, E>, + Self: for<'a> AsUpdateStream<'a, StreamErr = E>, Sf: FnMut(&mut St) -> Stt, Stt: StopToken + Send, Hauf: FnMut(&mut St, &mut dyn Iterator), Thf: Fn(&St) -> Option, { + type Err = E; type StopToken = Stt; fn stop_token(&mut self) -> Stt { @@ -143,10 +144,3 @@ where self.timeout_hint.as_ref().and_then(|f| f(&self.state)) } } - -fn assert_update_listener(l: L) -> L -where - L: UpdateListener, -{ - l -} diff --git a/src/dispatching/update_listeners/webhooks/axum.rs b/src/dispatching/update_listeners/webhooks/axum.rs index 6c16bdf0..82669a0f 100644 --- a/src/dispatching/update_listeners/webhooks/axum.rs +++ b/src/dispatching/update_listeners/webhooks/axum.rs @@ -38,7 +38,10 @@ use crate::{ /// /// [`axum_to_router`] and [`axum_no_setup`] for lower-level versions of this /// function. -pub async fn axum(bot: R, options: Options) -> Result, R::Err> +pub async fn axum( + bot: R, + options: Options, +) -> Result, R::Err> where R: Requester + Send + 'static, ::DeleteWebhook: Send, @@ -107,7 +110,10 @@ where pub async fn axum_to_router( bot: R, mut options: Options, -) -> Result<(impl UpdateListener, impl Future + Send, axum::Router), R::Err> +) -> Result< + (impl UpdateListener, impl Future + Send, axum::Router), + R::Err, +> where R: Requester + Send, ::DeleteWebhook: Send, @@ -148,7 +154,7 @@ where /// function. pub fn axum_no_setup( options: Options, -) -> (impl UpdateListener, impl Future, axum::Router) { +) -> (impl UpdateListener, impl Future, axum::Router) { use crate::{ dispatching::{ stop_token::AsyncStopToken, diff --git a/src/utils/shutdown_token.rs b/src/utils/shutdown_token.rs index 2447f6fd..998a64ae 100644 --- a/src/utils/shutdown_token.rs +++ b/src/utils/shutdown_token.rs @@ -93,7 +93,7 @@ impl fmt::Display for IdleShutdownError { impl std::error::Error for IdleShutdownError {} -pub(crate) fn shutdown_check_timeout_for(update_listener: &impl UpdateListener) -> Duration { +pub(crate) fn shutdown_check_timeout_for(update_listener: &impl UpdateListener) -> Duration { const MIN_SHUTDOWN_CHECK_TIMEOUT: Duration = Duration::from_secs(1); const DZERO: Duration = Duration::ZERO; From 11fe8baebff1855b7897199071d8ccd1b4b9fa96 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 5 Sep 2022 18:56:24 +0400 Subject: [PATCH 2/2] Correct a comment --- src/dispatching/update_listeners.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index f7153ee1..42169037 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -124,13 +124,9 @@ pub trait AsUpdateStream<'a> { type StreamErr; /// The stream of updates from Telegram. - // HACK: There is currently no way to write something like - // `-> impl for<'a> AsUpdateStream<'a, E, Stream: Send>`. Since we return - // `impl UpdateListener` from `polling`, we need to have `Send` bound here, - // to make the stream `Send`. - // - // Without this it's, for example, impossible to spawn a tokio task with - // teloxide polling. + // NB: `Send` is not strictly required here, but it makes it easier to return + // `impl AsUpdateStream` and also you want `Send` streams almost (?) always + // anyway. type Stream: Stream> + Send + 'a; /// Creates the update [`Stream`].