Fix error handlers

This commit is contained in:
Temirkhan Myrzamadi 2020-02-18 06:19:50 +06:00
parent d23b226e86
commit f20932a730
6 changed files with 159 additions and 162 deletions

View file

@ -47,25 +47,25 @@ where
/// ///
/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage /// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage
#[must_use] #[must_use]
pub fn new(handler: H) -> Arc<Self> { pub fn new(handler: H) -> Self {
Arc::new(Self { Self {
storage: InMemStorage::new(), storage: InMemStorage::new(),
handler: Arc::new(handler), handler: Arc::new(handler),
senders: Arc::new(Map::new()), senders: Arc::new(Map::new()),
}) }
} }
/// Creates a dispatcher with the specified `handler` and `storage`. /// Creates a dispatcher with the specified `handler` and `storage`.
#[must_use] #[must_use]
pub fn with_storage<Stg>(handler: H, storage: Arc<Stg>) -> Arc<Self> pub fn with_storage<Stg>(handler: H, storage: Arc<Stg>) -> Self
where where
Stg: Storage<D> + Send + Sync + 'static, Stg: Storage<D> + Send + Sync + 'static,
{ {
Arc::new(Self { Self {
storage, storage,
handler: Arc::new(handler), handler: Arc::new(handler),
senders: Arc::new(Map::new()), senders: Arc::new(Map::new()),
}) }
} }
#[must_use] #[must_use]

View file

@ -1,6 +1,5 @@
use std::pin::Pin;
use crate::prelude::{DialogueDispatcherHandlerCtx, DialogueStage}; use crate::prelude::{DialogueDispatcherHandlerCtx, DialogueStage};
use futures::future::BoxFuture;
use std::{future::Future, sync::Arc}; use std::{future::Future, sync::Arc};
/// An asynchronous handler of an update used in [`DialogueDispatcher`]. /// An asynchronous handler of an update used in [`DialogueDispatcher`].
@ -14,7 +13,7 @@ pub trait DialogueDispatcherHandler<Upd, D> {
fn handle( fn handle(
self: Arc<Self>, self: Arc<Self>,
ctx: DialogueDispatcherHandlerCtx<Upd, D>, ctx: DialogueDispatcherHandlerCtx<Upd, D>,
) -> Pin<Box<dyn Future<Output = DialogueStage<D>> + Send + 'static>> ) -> BoxFuture<'static, DialogueStage<D>>
where where
DialogueDispatcherHandlerCtx<Upd, D>: Send + 'static; DialogueDispatcherHandlerCtx<Upd, D>: Send + 'static;
} }
@ -27,7 +26,7 @@ where
fn handle( fn handle(
self: Arc<Self>, self: Arc<Self>,
ctx: DialogueDispatcherHandlerCtx<Upd, D>, ctx: DialogueDispatcherHandlerCtx<Upd, D>,
) -> Pin<Box<dyn Future<Output = Fut::Output> + Send + 'static>> ) -> BoxFuture<'static, Fut::Output>
where where
DialogueDispatcherHandlerCtx<Upd, D>: Send + 'static, DialogueDispatcherHandlerCtx<Upd, D>: Send + 'static,
{ {

View file

@ -211,7 +211,7 @@ impl Dispatcher {
pub async fn dispatch(&self) { pub async fn dispatch(&self) {
self.dispatch_with_listener( self.dispatch_with_listener(
update_listeners::polling_default(Arc::clone(&self.bot)), update_listeners::polling_default(Arc::clone(&self.bot)),
&LoggingErrorHandler::new("An error from the update listener"), LoggingErrorHandler::new("An error from the update listener"),
) )
.await; .await;
} }
@ -221,7 +221,7 @@ impl Dispatcher {
pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>( pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>(
&'a self, &'a self,
update_listener: UListener, update_listener: UListener,
update_listener_error_handler: &'a Eh, update_listener_error_handler: Arc<Eh>,
) where ) where
UListener: UpdateListener<ListenerE> + 'a, UListener: UpdateListener<ListenerE> + 'a,
Eh: ErrorHandler<ListenerE> + 'a, Eh: ErrorHandler<ListenerE> + 'a,
@ -230,105 +230,112 @@ impl Dispatcher {
let update_listener = Box::pin(update_listener); let update_listener = Box::pin(update_listener);
update_listener update_listener
.for_each(move |update| async move { .for_each(move |update| {
log::trace!("Dispatcher received an update: {:?}", update); let update_listener_error_handler =
Arc::clone(&update_listener_error_handler);
let update = match update { async move {
Ok(update) => update, log::trace!("Dispatcher received an update: {:?}", update);
Err(error) => {
update_listener_error_handler.handle_error(error).await;
return;
}
};
match update.kind { let update = match update {
UpdateKind::Message(message) => { Ok(update) => update,
send!( Err(error) => {
&self.bot, Arc::clone(&update_listener_error_handler)
&self.messages_queue, .handle_error(error)
message, .await;
UpdateKind::Message return;
); }
} };
UpdateKind::EditedMessage(message) => {
send!( match update.kind {
&self.bot, UpdateKind::Message(message) => {
&self.edited_messages_queue, send!(
message, &self.bot,
UpdateKind::EditedMessage &self.messages_queue,
); message,
} UpdateKind::Message
UpdateKind::ChannelPost(post) => { );
send!( }
&self.bot, UpdateKind::EditedMessage(message) => {
&self.channel_posts_queue, send!(
post, &self.bot,
UpdateKind::ChannelPost &self.edited_messages_queue,
); message,
} UpdateKind::EditedMessage
UpdateKind::EditedChannelPost(post) => { );
send!( }
&self.bot, UpdateKind::ChannelPost(post) => {
&self.edited_channel_posts_queue, send!(
post, &self.bot,
UpdateKind::EditedChannelPost &self.channel_posts_queue,
); post,
} UpdateKind::ChannelPost
UpdateKind::InlineQuery(query) => { );
send!( }
&self.bot, UpdateKind::EditedChannelPost(post) => {
&self.inline_queries_queue, send!(
query, &self.bot,
UpdateKind::InlineQuery &self.edited_channel_posts_queue,
); post,
} UpdateKind::EditedChannelPost
UpdateKind::ChosenInlineResult(result) => { );
send!( }
&self.bot, UpdateKind::InlineQuery(query) => {
&self.chosen_inline_results_queue, send!(
result, &self.bot,
UpdateKind::ChosenInlineResult &self.inline_queries_queue,
); query,
} UpdateKind::InlineQuery
UpdateKind::CallbackQuery(query) => { );
send!( }
&self.bot, UpdateKind::ChosenInlineResult(result) => {
&self.callback_queries_queue, send!(
query, &self.bot,
UpdateKind::CallbackQuer &self.chosen_inline_results_queue,
); result,
} UpdateKind::ChosenInlineResult
UpdateKind::ShippingQuery(query) => { );
send!( }
&self.bot, UpdateKind::CallbackQuery(query) => {
&self.shipping_queries_queue, send!(
query, &self.bot,
UpdateKind::ShippingQuery &self.callback_queries_queue,
); query,
} UpdateKind::CallbackQuer
UpdateKind::PreCheckoutQuery(query) => { );
send!( }
&self.bot, UpdateKind::ShippingQuery(query) => {
&self.pre_checkout_queries_queue, send!(
query, &self.bot,
UpdateKind::PreCheckoutQuery &self.shipping_queries_queue,
); query,
} UpdateKind::ShippingQuery
UpdateKind::Poll(poll) => { );
send!( }
&self.bot, UpdateKind::PreCheckoutQuery(query) => {
&self.polls_queue, send!(
poll, &self.bot,
UpdateKind::Poll &self.pre_checkout_queries_queue,
); query,
} UpdateKind::PreCheckoutQuery
UpdateKind::PollAnswer(answer) => { );
send!( }
&self.bot, UpdateKind::Poll(poll) => {
&self.poll_answers_queue, send!(
answer, &self.bot,
UpdateKind::PollAnswer &self.polls_queue,
); poll,
UpdateKind::Poll
);
}
UpdateKind::PollAnswer(answer) => {
send!(
&self.bot,
&self.poll_answers_queue,
answer,
UpdateKind::PollAnswer
);
}
} }
} }
}) })

View file

@ -1,6 +1,7 @@
use std::{future::Future, pin::Pin}; use std::future::Future;
use crate::dispatching::{DispatcherHandlerCtx, DispatcherHandlerRx}; use crate::dispatching::{DispatcherHandlerCtx, DispatcherHandlerRx};
use futures::future::BoxFuture;
/// An asynchronous handler of a stream of updates used in [`Dispatcher`]. /// An asynchronous handler of a stream of updates used in [`Dispatcher`].
/// ///
@ -13,7 +14,7 @@ pub trait DispatcherHandler<Upd> {
fn handle( fn handle(
self, self,
updates: DispatcherHandlerRx<Upd>, updates: DispatcherHandlerRx<Upd>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> ) -> BoxFuture<'static, ()>
where where
DispatcherHandlerCtx<Upd>: Send + 'static; DispatcherHandlerCtx<Upd>: Send + 'static;
} }
@ -23,10 +24,7 @@ where
F: FnOnce(DispatcherHandlerRx<Upd>) -> Fut + Send + 'static, F: FnOnce(DispatcherHandlerRx<Upd>) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static, Fut: Future<Output = ()> + Send + 'static,
{ {
fn handle( fn handle(self, updates: DispatcherHandlerRx<Upd>) -> BoxFuture<'static, ()>
self,
updates: DispatcherHandlerRx<Upd>,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>
where where
DispatcherHandlerCtx<Upd>: Send + 'static, DispatcherHandlerCtx<Upd>: Send + 'static,
{ {

View file

@ -1,4 +1,5 @@
use std::{convert::Infallible, fmt::Debug, future::Future, pin::Pin}; use futures::future::BoxFuture;
use std::{convert::Infallible, fmt::Debug, future::Future, sync::Arc};
/// An asynchronous handler of an error. /// An asynchronous handler of an error.
/// ///
@ -6,26 +7,16 @@ use std::{convert::Infallible, fmt::Debug, future::Future, pin::Pin};
/// overview](crate::dispatching). /// overview](crate::dispatching).
pub trait ErrorHandler<E> { pub trait ErrorHandler<E> {
#[must_use] #[must_use]
fn handle_error<'a>( fn handle_error(self: Arc<Self>, error: E) -> BoxFuture<'static, ()>;
&'a self,
error: E,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
E: 'a;
} }
impl<E, F, Fut> ErrorHandler<E> for F impl<E, F, Fut> ErrorHandler<E> for F
where where
F: Fn(E) -> Fut, F: Fn(E) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()>, E: Send + 'static,
Fut: Future<Output = ()> + Send,
{ {
fn handle_error<'a>( fn handle_error(self: Arc<Self>, error: E) -> BoxFuture<'static, ()> {
&'a self,
error: E,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
E: 'a,
{
Box::pin(async move { self(error).await }) Box::pin(async move { self(error).await })
} }
} }
@ -38,21 +29,22 @@ where
/// # async fn main_() { /// # async fn main_() {
/// use teloxide::dispatching::{ErrorHandler, IgnoringErrorHandler}; /// use teloxide::dispatching::{ErrorHandler, IgnoringErrorHandler};
/// ///
/// IgnoringErrorHandler.handle_error(()).await; /// IgnoringErrorHandler::new().handle_error(()).await;
/// IgnoringErrorHandler.handle_error(404).await; /// IgnoringErrorHandler::new().handle_error(404).await;
/// IgnoringErrorHandler.handle_error("error").await; /// IgnoringErrorHandler::new().handle_error("error").await;
/// # } /// # }
/// ``` /// ```
pub struct IgnoringErrorHandler; pub struct IgnoringErrorHandler;
impl IgnoringErrorHandler {
#[must_use]
pub fn new() -> Arc<Self> {
Arc::new(Self)
}
}
impl<E> ErrorHandler<E> for IgnoringErrorHandler { impl<E> ErrorHandler<E> for IgnoringErrorHandler {
fn handle_error<'a>( fn handle_error(self: Arc<Self>, _: E) -> BoxFuture<'static, ()> {
&'a self,
_: E,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
E: 'a,
{
Box::pin(async {}) Box::pin(async {})
} }
} }
@ -71,10 +63,10 @@ impl<E> ErrorHandler<E> for IgnoringErrorHandler {
/// let result: Result<String, Infallible> = "str".try_into(); /// let result: Result<String, Infallible> = "str".try_into();
/// match result { /// match result {
/// Ok(string) => println!("{}", string), /// Ok(string) => println!("{}", string),
/// Err(inf) => IgnoringErrorHandlerSafe.handle_error(inf).await, /// Err(inf) => IgnoringErrorHandlerSafe::new().handle_error(inf).await,
/// } /// }
/// ///
/// IgnoringErrorHandlerSafe.handle_error(return).await; // return type of `return` is `!` (aka never) /// IgnoringErrorHandlerSafe::new().handle_error(return).await; // return type of `return` is `!` (aka never)
/// # } /// # }
/// ``` /// ```
/// ///
@ -88,15 +80,16 @@ impl<E> ErrorHandler<E> for IgnoringErrorHandler {
/// [`Infallible`]: std::convert::Infallible /// [`Infallible`]: std::convert::Infallible
pub struct IgnoringErrorHandlerSafe; pub struct IgnoringErrorHandlerSafe;
impl IgnoringErrorHandlerSafe {
#[must_use]
pub fn new() -> Arc<Self> {
Arc::new(Self)
}
}
#[allow(unreachable_code)] #[allow(unreachable_code)]
impl ErrorHandler<Infallible> for IgnoringErrorHandlerSafe { impl ErrorHandler<Infallible> for IgnoringErrorHandlerSafe {
fn handle_error<'a>( fn handle_error(self: Arc<Self>, _: Infallible) -> BoxFuture<'static, ()> {
&'a self,
_: Infallible,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
Infallible: 'a,
{
Box::pin(async {}) Box::pin(async {})
} }
} }
@ -109,14 +102,13 @@ impl ErrorHandler<Infallible> for IgnoringErrorHandlerSafe {
/// # async fn main_() { /// # async fn main_() {
/// use teloxide::dispatching::{ErrorHandler, LoggingErrorHandler}; /// use teloxide::dispatching::{ErrorHandler, LoggingErrorHandler};
/// ///
/// LoggingErrorHandler::default().handle_error(()).await; /// LoggingErrorHandler::empty().handle_error(()).await;
/// LoggingErrorHandler::new("error").handle_error(404).await; /// LoggingErrorHandler::new("error").handle_error(404).await;
/// LoggingErrorHandler::new("error") /// LoggingErrorHandler::new("error")
/// .handle_error("Invalid data type!") /// .handle_error("Invalid data type!")
/// .await; /// .await;
/// # } /// # }
/// ``` /// ```
#[derive(Default)]
pub struct LoggingErrorHandler { pub struct LoggingErrorHandler {
text: String, text: String,
} }
@ -126,11 +118,17 @@ impl LoggingErrorHandler {
/// ///
/// The logs will be printed in this format: `{text}: {:?}`. /// The logs will be printed in this format: `{text}: {:?}`.
#[must_use] #[must_use]
pub fn new<T>(text: T) -> Self pub fn new<T>(text: T) -> Arc<Self>
where where
T: Into<String>, T: Into<String>,
{ {
Self { text: text.into() } Arc::new(Self { text: text.into() })
}
/// A shortcut for `LoggingErrorHandler::new("Error".to_owned())`.
#[must_use]
pub fn empty() -> Arc<Self> {
Self::new("Error".to_owned())
} }
} }
@ -138,13 +136,7 @@ impl<E> ErrorHandler<E> for LoggingErrorHandler
where where
E: Debug, E: Debug,
{ {
fn handle_error<'a>( fn handle_error(self: Arc<Self>, error: E) -> BoxFuture<'static, ()> {
&'a self,
error: E,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
E: 'a,
{
log::error!("{text}: {:?}", error, text = self.text); log::error!("{text}: {:?}", error, text = self.text);
Box::pin(async {}) Box::pin(async {})
} }

View file

@ -6,7 +6,8 @@ pub use crate::{
exit, next, DialogueDispatcher, DialogueDispatcherHandlerCtx, exit, next, DialogueDispatcher, DialogueDispatcherHandlerCtx,
DialogueStage, GetChatId, DialogueStage, GetChatId,
}, },
Dispatcher, DispatcherHandlerCtx, DispatcherHandlerRx, LoggingErrorHandler, ErrorHandler Dispatcher, DispatcherHandlerCtx, DispatcherHandlerRx, ErrorHandler,
LoggingErrorHandler,
}, },
requests::{Request, ResponseResult}, requests::{Request, ResponseResult},
types::{Message, Update}, types::{Message, Update},