git push origin devMerge branch 'redesign_dispatching' into dev

This commit is contained in:
Temirkhan Myrzamadi 2019-12-31 18:43:23 +06:00
commit 934b956165
13 changed files with 724 additions and 713 deletions

View file

@ -18,6 +18,7 @@ futures-preview = "0.3.0-alpha.19"
async-trait = "0.1.13"
thiserror = "1.0.2"
serde_with_macros = "1.0.1"
either = "1.5.3"
[features]
default = []

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

View file

@ -76,10 +76,7 @@ impl Bot {
/// # use teloxide::{Bot, requests::payloads::SendAnimation, types::InputFile};
/// # #[tokio::main] async fn main_() {
/// let bot = Bot::new("TOKEN");
/// let payload = SendAnimation::new(
/// 123456,
/// InputFile::Url(String::from("https://example.com"))
/// );
/// let payload = SendAnimation::new(123456, InputFile::Url(String::from("https://example.com")));
/// bot.execute_multipart(&payload).await;
/// # }
/// ```

View file

@ -1,119 +0,0 @@
// Infallible used here instead of `!` to be compatible with rust <1.41
use std::{convert::Infallible, future::Future, pin::Pin};
use async_trait::async_trait;
/// Implementors of this trait are treated as error-handlers.
#[async_trait]
pub trait ErrorPolicy<E> {
async fn handle_error(&self, error: E)
where
E: 'async_trait;
}
/// Error policy that silently ignores all errors
///
/// ## Example
/// ```
/// # #[tokio::main]
/// # async fn main_() {
/// use teloxide::dispatching::dispatchers::filter::error_policy::{
/// ErrorPolicy, Ignore,
/// };
///
/// Ignore.handle_error(()).await;
/// Ignore.handle_error(404).await;
/// Ignore.handle_error(String::from("error")).await;
/// # }
/// ```
pub struct Ignore;
#[async_trait]
impl<E> ErrorPolicy<E> for Ignore
where
E: Send,
{
async fn handle_error(&self, _: E)
where
E: 'async_trait,
{
}
}
/// Error policy that silently ignores all errors that can never happen (e.g.:
/// [`!`] or [`Infallible`])
///
/// ## Examples
/// ```
/// # #[tokio::main]
/// # async fn main_() {
/// use std::convert::{TryInto, Infallible};
///
/// use teloxide::dispatching::dispatchers::filter::error_policy::{
/// ErrorPolicy,
/// IgnoreSafe,
/// };
///
/// let result: Result<String, Infallible> = "str".try_into();
/// match result {
/// Ok(string) => println!("{}", string),
/// Err(inf) => IgnoreSafe.handle_error(inf).await,
/// }
///
/// IgnoreSafe.handle_error(return).await; // return type of `return` is `!` (aka never)
/// # }
/// ```
///
/// ```compile_fail
/// use teloxide::dispatching::dispatchers::filter::error_policy::{
/// ErrorPolicy, IgnoreSafe,
/// };
///
/// IgnoreSafe.handle_error(0);
/// ```
///
/// [`!`]: https://doc.rust-lang.org/std/primitive.never.html
/// [`Infallible`]: std::convert::Infallible
pub struct IgnoreSafe;
#[allow(unreachable_code)]
#[async_trait]
impl ErrorPolicy<Infallible> for IgnoreSafe {
async fn handle_error(&self, _: Infallible)
where
Infallible: 'async_trait,
{
}
}
/// Implementation of `ErrorPolicy` for `async fn`s
///
/// ## Example
/// ```
/// # #[tokio::main]
/// # async fn main_() {
/// use teloxide::dispatching::dispatchers::filter::error_policy::ErrorPolicy;
///
/// let closure = |e: i32| async move { eprintln!("Error code{}", e) };
///
/// closure.handle_error(404).await;
/// # }
/// ```
impl<E, F, Fut> ErrorPolicy<E> for F
where
F: Fn(E) -> Fut + Sync,
Fut: Future<Output = ()> + Send,
E: Send,
{
fn handle_error<'s, 'async_trait>(
&'s self,
error: E,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where
's: 'async_trait,
Self: 'async_trait,
E: 'async_trait,
{
Box::pin(async move { self(error).await })
}
}

View file

@ -1,384 +0,0 @@
use futures::StreamExt;
use async_trait::async_trait;
use crate::{
dispatching::{
dispatchers::filter::error_policy::ErrorPolicy, filters::Filter,
handler::Handler, updater::Updater, Dispatcher,
},
types::{
CallbackQuery, ChosenInlineResult, InlineQuery, Message, Update,
UpdateKind,
},
};
pub mod error_policy;
struct FilterAndHandler<'a, T, E> {
filter: Box<dyn Filter<T> + 'a>,
handler: Box<dyn Handler<'a, T, E> + 'a>,
}
impl<'a, T, E> FilterAndHandler<'a, T, E> {
fn new<F, H>(filter: F, handler: H) -> Self
where
F: Filter<T> + 'a,
H: Handler<'a, T, E> + 'a,
{
FilterAndHandler {
filter: Box::new(filter),
handler: Box::new(handler),
}
}
}
type FiltersAndHandlers<'a, T, E> = Vec<FilterAndHandler<'a, T, E>>;
/// Dispatcher that dispatches updates from telegram.
///
/// This is 'filter' implementation with following limitations:
/// - Error (`E` generic parameter) _must_ implement [`std::fmt::Debug`]
/// - All 'handlers' are boxed
/// - Handler's fututres are also boxed
/// - All errors from [updater] are ignored (TODO: remove this limitation)
/// - All handlers executed in order (this means that in dispatching have 2
/// upadtes it will first execute some handler into complition with first
/// update and **then** search for handler for second update, this is probably
/// wrong)
///
/// ## Examples
///
/// Simplest example:
/// ```no_run
/// # async fn run() {
/// use std::convert::Infallible;
///
/// use teloxide::{
/// dispatching::{
/// dispatchers::filter::{
/// error_policy::ErrorPolicy, FilterDispatcher,
/// },
/// updater::polling,
/// },
/// types::Message,
/// Bot,
/// };
///
/// async fn handle_edited_message(mes: Message) {
/// println!("Edited message: {:?}", mes)
/// }
///
/// let bot = Bot::new("TOKEN");
///
/// // create dispatching which handlers can't fail
/// // with error policy that just ignores all errors (that can't ever happen)
/// let mut dp = FilterDispatcher::<Infallible, _>::new(|_| async {})
/// // Add 'handler' that will handle all messages sent to the bot
/// .message_handler(true, |mes: Message| async move {
/// println!("New message: {:?}", mes)
/// })
/// // Add 'handler' that will handle all
/// // messages edited in chat with the bot
/// .edited_message_handler(true, handle_edited_message);
///
/// // Start dispatching updates from long polling
/// dp.dispatch(polling(&bot)).await;
/// # }
/// ```
///
/// [`std::fmt::Debug`]: std::fmt::Debug
/// [updater]: crate::dispatching::updater
pub struct FilterDispatcher<'a, E, Ep> {
message_handlers: FiltersAndHandlers<'a, Message, E>,
edited_message_handlers: FiltersAndHandlers<'a, Message, E>,
channel_post_handlers: FiltersAndHandlers<'a, Message, E>,
edited_channel_post_handlers: FiltersAndHandlers<'a, Message, E>,
inline_query_handlers: FiltersAndHandlers<'a, InlineQuery, E>,
chosen_inline_result_handlers:
FiltersAndHandlers<'a, ChosenInlineResult, E>,
callback_query_handlers: FiltersAndHandlers<'a, CallbackQuery, E>,
error_policy: Ep,
}
impl<'a, E, Ep> FilterDispatcher<'a, E, Ep>
where
Ep: ErrorPolicy<E>,
E: std::fmt::Debug, // TODO: Is this really necessary?
{
pub fn new(error_policy: Ep) -> Self {
FilterDispatcher {
message_handlers: Vec::new(),
edited_message_handlers: Vec::new(),
channel_post_handlers: Vec::new(),
edited_channel_post_handlers: Vec::new(),
inline_query_handlers: Vec::new(),
chosen_inline_result_handlers: Vec::new(),
callback_query_handlers: Vec::new(),
error_policy,
}
}
pub fn message_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<Message> + 'a,
H: Handler<'a, Message, E> + 'a,
{
self.message_handlers
.push(FilterAndHandler::new(filter, handler));
self
}
pub fn edited_message_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<Message> + 'a,
H: Handler<'a, Message, E> + 'a,
{
self.edited_message_handlers
.push(FilterAndHandler::new(filter, handler));
self
}
pub fn channel_post_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<Message> + 'a,
H: Handler<'a, Message, E> + 'a,
{
self.channel_post_handlers
.push(FilterAndHandler::new(filter, handler));
self
}
pub fn edited_channel_post_handler<F, H>(
mut self,
filter: F,
handler: H,
) -> Self
where
F: Filter<Message> + 'a,
H: Handler<'a, Message, E> + 'a,
{
self.edited_channel_post_handlers
.push(FilterAndHandler::new(filter, handler));
self
}
pub fn inline_query_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<InlineQuery> + 'a,
H: Handler<'a, InlineQuery, E> + 'a,
{
self.inline_query_handlers
.push(FilterAndHandler::new(filter, handler));
self
}
pub fn chosen_inline_result_handler<F, H>(
mut self,
filter: F,
handler: H,
) -> Self
where
F: Filter<ChosenInlineResult> + 'a,
H: Handler<'a, ChosenInlineResult, E> + 'a,
{
self.chosen_inline_result_handlers
.push(FilterAndHandler::new(filter, handler));
self
}
pub fn callback_query_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<CallbackQuery> + 'a,
H: Handler<'a, CallbackQuery, E> + 'a,
{
self.callback_query_handlers
.push(FilterAndHandler::new(filter, handler));
self
}
// TODO: Can someone simplify this?
pub async fn dispatch<U>(&mut self, updates: U)
where
U: Updater + 'a,
{
updates
.for_each(|res| {
async {
let Update { kind, id } = match res {
Ok(upd) => upd,
_ => return, // TODO: proper error handling
};
log::debug!(
"Handled update#{id:?}: {kind:?}",
id = id,
kind = kind
);
match kind {
UpdateKind::Message(mes) => {
self.handle(mes, &self.message_handlers).await
}
UpdateKind::EditedMessage(mes) => {
self.handle(mes, &self.edited_message_handlers)
.await;
}
UpdateKind::ChannelPost(post) => {
self.handle(post, &self.channel_post_handlers)
.await;
}
UpdateKind::EditedChannelPost(post) => {
self.handle(
post,
&self.edited_channel_post_handlers,
)
.await;
}
UpdateKind::InlineQuery(query) => {
self.handle(query, &self.inline_query_handlers)
.await;
}
UpdateKind::ChosenInlineResult(result) => {
self.handle(
result,
&self.chosen_inline_result_handlers,
)
.await;
}
UpdateKind::CallbackQuery(callback) => {
self.handle(
callback,
&self.callback_query_handlers,
)
.await;
}
}
}
})
.await;
}
#[allow(clippy::ptr_arg)] // TODO: proper fix
async fn handle<T>(
&self,
update: T,
handlers: &FiltersAndHandlers<'a, T, E>,
) where
T: std::fmt::Debug,
{
for x in handlers {
if x.filter.test(&update) {
if let Err(err) = x.handler.handle(update).await {
self.error_policy.handle_error(err).await
}
return;
}
}
log::warn!("unhandled update {:?}", update);
}
}
#[async_trait(? Send)]
impl<'a, U, E, Ep> Dispatcher<'a, U> for FilterDispatcher<'a, E, Ep>
where
E: std::fmt::Debug,
U: Updater + 'a,
Ep: ErrorPolicy<E>,
{
async fn dispatch(&'a mut self, updater: U) {
FilterDispatcher::dispatch(self, updater).await
}
}
#[cfg(test)]
mod tests {
use std::{
convert::Infallible,
sync::atomic::{AtomicI32, Ordering},
};
use futures::Stream;
use crate::{
dispatching::{
dispatchers::filter::FilterDispatcher, updater::StreamUpdater,
},
types::{
Chat, ChatKind, ForwardKind, MediaKind, Message, MessageKind,
Sender, Update, UpdateKind, User,
},
};
#[tokio::test]
async fn first_handler_executes_1_time() {
let counter = &AtomicI32::new(0);
let counter2 = &AtomicI32::new(0);
let mut dp = FilterDispatcher::<Infallible, _>::new(|_| async {})
.message_handler(true, |_mes: Message| async move {
counter.fetch_add(1, Ordering::SeqCst);
})
.message_handler(true, |_mes: Message| async move {
counter2.fetch_add(1, Ordering::SeqCst);
Ok::<_, Infallible>(())
});
dp.dispatch(one_message_updater()).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
assert_eq!(counter2.load(Ordering::SeqCst), 0);
}
fn message() -> Message {
Message {
id: 6534,
date: 1_567_898_953,
chat: Chat {
id: 218_485_655,
photo: None,
kind: ChatKind::Private {
type_: (),
first_name: Some("W".to_string()),
last_name: None,
username: Some("WaffleLapkin".to_string()),
},
},
kind: MessageKind::Common {
from: Sender::User(User {
id: 457_569_668,
is_bot: true,
first_name: "BT".to_string(),
last_name: None,
username: Some("BloodyTestBot".to_string()),
language_code: None,
}),
forward_kind: ForwardKind::Origin {
reply_to_message: None,
},
edit_date: None,
media_kind: MediaKind::Text {
text: "text".to_string(),
entities: vec![],
},
reply_markup: None,
},
}
}
fn message_update() -> Update {
Update {
id: 0,
kind: UpdateKind::Message(message()),
}
}
fn one_message_updater(
) -> StreamUpdater<impl Stream<Item = Result<Update, Infallible>>> {
use futures::{future::ready, stream};
StreamUpdater::new(stream::once(ready(Ok(message_update()))))
}
}

View file

@ -1,3 +0,0 @@
pub use filter::FilterDispatcher;
pub mod filter;

View file

@ -0,0 +1,149 @@
//! Error handlers.
// Infallible used here instead of `!` to be compatible with rust <1.41.
use std::{convert::Infallible, fmt::Debug, future::Future, pin::Pin};
/// An asynchronous handler of an error.
pub trait ErrorHandler<E> {
#[must_use]
fn handle_error<'a>(
&'a self,
error: E,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
E: 'a;
}
/// A handler that silently ignores all errors.
///
/// ## Example
/// ```
/// # #[tokio::main]
/// # async fn main_() {
/// use teloxide::dispatching::error_handlers::{ErrorHandler, Ignore};
///
/// Ignore.handle_error(()).await;
/// Ignore.handle_error(404).await;
/// Ignore.handle_error(String::from("error")).await;
/// # }
/// ```
pub struct Ignore;
impl<E> ErrorHandler<E> for Ignore {
#[must_use]
fn handle_error<'a>(
&'a self,
_: E,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
E: 'a,
{
Box::pin(async {})
}
}
/// An error handler that silently ignores all errors that can never happen
/// (e.g.: [`!`] or [`Infallible`]).
///
/// ## Examples
/// ```
/// # #[tokio::main]
/// # async fn main_() {
/// use std::convert::{Infallible, TryInto};
///
/// use teloxide::dispatching::error_handlers::{ErrorHandler, IgnoreSafe};
///
/// let result: Result<String, Infallible> = "str".try_into();
/// match result {
/// Ok(string) => println!("{}", string),
/// Err(inf) => IgnoreSafe.handle_error(inf).await,
/// }
///
/// IgnoreSafe.handle_error(return).await; // return type of `return` is `!` (aka never)
/// # }
/// ```
///
/// ```compile_fail
/// use teloxide::dispatching::dispatchers::filter::error_policy::{
/// ErrorPolicy, IgnoreSafe,
/// };
///
/// IgnoreSafe.handle_error(0);
/// ```
///
/// [`!`]: https://doc.rust-lang.org/std/primitive.never.html
/// [`Infallible`]: std::convert::Infallible
pub struct IgnoreSafe;
#[allow(unreachable_code)]
impl ErrorHandler<Infallible> for IgnoreSafe {
fn handle_error<'a>(
&'a self,
_: Infallible,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
Infallible: 'a,
{
Box::pin(async {})
}
}
/// An error handler that prints all errors passed into it.
///
/// ## Example
/// ```
/// # #[tokio::main]
/// # async fn main_() {
/// use teloxide::dispatching::error_handlers::{ErrorHandler, Print};
///
/// Print.handle_error(()).await;
/// Print.handle_error(404).await;
/// Print.handle_error(String::from("error")).await;
/// # }
/// ```
pub struct Print;
impl<E> ErrorHandler<E> for Print
where
E: Debug,
{
fn handle_error<'a>(
&'a self,
error: E,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
E: 'a,
{
log::debug!("error: {:?}", error);
Box::pin(async {})
}
}
/// The implementation of `ErrorHandler` for `Fn(error) -> Future<Output = ()>`.
///
/// ## Example
/// ```
/// # #[tokio::main]
/// # async fn main_() {
/// use teloxide::dispatching::error_handlers::ErrorHandler;
///
/// let mut closure = |e: i32| async move { eprintln!("Error code{}", e) };
///
/// closure.handle_error(404).await;
/// # }
/// ```
impl<E, F, Fut> ErrorHandler<E> for F
where
F: Fn(E) -> Fut,
Fut: Future<Output = ()>,
{
fn handle_error<'a>(
&'a self,
error: E,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
E: 'a,
{
Box::pin(async move { self(error).await })
}
}

View file

@ -0,0 +1,372 @@
//! A dispatcher based on filters.
use futures::StreamExt;
use crate::{
dispatching::{filters::Filter, ErrorHandler, Handler, Updater},
types::{
CallbackQuery, ChosenInlineResult, InlineQuery, Message, Update,
UpdateKind,
},
};
use either::Either;
type FilterWithHandler<'a, T, E> =
(Box<dyn Filter<T> + 'a>, Box<dyn Handler<T, E> + 'a>);
type FiltersWithHandlers<'a, T, E> = Vec<FilterWithHandler<'a, T, E>>;
/// A dispatcher based on filters.
///
/// It consists of:
/// 1. [`ErrorHandler`] than handles errors both from [`Updater`] and
/// [`Handler`].
/// 2. Filters and handlers.
///
/// First you register filters and handlers using the methods defined below, and
/// then you call [`.dispatch(updater)`]. Filters and handlers are executed in
/// order of registering. The following flowchart represents how this dispatcher
/// acts:
///
/// <div align="center">
/// <img src="https://github.com/teloxide/teloxide/blob/dev/media/FILTER_DP_FLOWCHART.png" width="700" />
/// </div>
///
/// ## Examples
///
/// The simplest example:
/// ```no_run
/// # async fn run() {
/// use std::convert::Infallible;
///
/// use teloxide::{
/// dispatching::{updaters::polling_default, FilterDispatcher},
/// types::Message,
/// Bot,
/// };
///
/// async fn handle_edited_message(mes: Message) -> Result<(), Infallible> {
/// println!("Edited message: {:?}", mes);
/// Ok(())
/// }
///
/// let bot = Bot::new("TOKEN");
///
/// // Create a dispatcher which handlers can't fail with the
/// // error handler that just ignores all errors (that can't ever happen).
/// let mut dp = FilterDispatcher::<Infallible, _>::new(|_| async {})
/// // Add a handler, which handles all messages sent to the bot.
/// .message_handler(true, |mes: Message| async move {
/// println!("New message: {:?}", mes);
/// Ok(())
/// })
/// // Add a handler, which handles all messages edited in a chat
/// // with the bot.
/// .edited_message_handler(true, handle_edited_message);
///
/// // Start dispatching updates using long polling.
/// dp.dispatch(polling_default(&bot)).await;
/// # }
/// ```
///
/// [`std::fmt::Debug`]: std::fmt::Debug
/// [updater]: crate::dispatching::updater
/// [`.dispatch(updater)`]: FilterDispatcher::dispatch
pub struct FilterDispatcher<'a, E, Eh> {
message_handlers: FiltersWithHandlers<'a, Message, E>,
edited_message_handlers: FiltersWithHandlers<'a, Message, E>,
channel_post_handlers: FiltersWithHandlers<'a, Message, E>,
edited_channel_post_handlers: FiltersWithHandlers<'a, Message, E>,
inline_query_handlers: FiltersWithHandlers<'a, InlineQuery, E>,
chosen_inline_result_handlers:
FiltersWithHandlers<'a, ChosenInlineResult, E>,
callback_query_handlers: FiltersWithHandlers<'a, CallbackQuery, E>,
error_handler: Eh,
}
impl<'a, HandlerE, Eh> FilterDispatcher<'a, HandlerE, Eh> {
pub fn new<UpdaterE>(error_handler: Eh) -> Self
where
Eh: ErrorHandler<Either<UpdaterE, HandlerE>>,
{
FilterDispatcher {
message_handlers: Vec::new(),
edited_message_handlers: Vec::new(),
channel_post_handlers: Vec::new(),
edited_channel_post_handlers: Vec::new(),
inline_query_handlers: Vec::new(),
chosen_inline_result_handlers: Vec::new(),
callback_query_handlers: Vec::new(),
error_handler,
}
}
pub fn message_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<Message> + 'a,
H: Handler<Message, HandlerE> + 'a,
{
self.message_handlers
.push((Box::new(filter), Box::new(handler)));
self
}
pub fn edited_message_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<Message> + 'a,
H: Handler<Message, HandlerE> + 'a,
{
self.edited_message_handlers
.push((Box::new(filter), Box::new(handler)));
self
}
pub fn channel_post_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<Message> + 'a,
H: Handler<Message, HandlerE> + 'a,
{
self.channel_post_handlers
.push((Box::new(filter), Box::new(handler)));
self
}
pub fn edited_channel_post_handler<F, H>(
mut self,
filter: F,
handler: H,
) -> Self
where
F: Filter<Message> + 'a,
H: Handler<Message, HandlerE> + 'a,
{
self.edited_channel_post_handlers
.push((Box::new(filter), Box::new(handler)));
self
}
pub fn inline_query_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<InlineQuery> + 'a,
H: Handler<InlineQuery, HandlerE> + 'a,
{
self.inline_query_handlers
.push((Box::new(filter), Box::new(handler)));
self
}
pub fn chosen_inline_result_handler<F, H>(
mut self,
filter: F,
handler: H,
) -> Self
where
F: Filter<ChosenInlineResult> + 'a,
H: Handler<ChosenInlineResult, HandlerE> + 'a,
{
self.chosen_inline_result_handlers
.push((Box::new(filter), Box::new(handler)));
self
}
pub fn callback_query_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<CallbackQuery> + 'a,
H: Handler<CallbackQuery, HandlerE> + 'a,
{
self.callback_query_handlers
.push((Box::new(filter), Box::new(handler)));
self
}
pub async fn dispatch<UpdaterE, U>(&mut self, updater: U)
where
U: Updater<UpdaterE> + 'a,
Eh: ErrorHandler<Either<UpdaterE, HandlerE>>,
{
updater
.for_each_concurrent(None, |res| async {
let Update { kind, id } = match res {
Ok(upd) => upd,
Err(err) => {
self.error_handler
.handle_error(Either::Left(err))
.await;
return;
}
};
log::debug!(
"Handled update#{id:?}: {kind:?}",
id = id,
kind = kind
);
match kind {
UpdateKind::Message(mes) => {
Self::handle(
mes,
&self.message_handlers,
&self.error_handler,
)
.await
}
UpdateKind::EditedMessage(mes) => {
Self::handle(
mes,
&self.edited_message_handlers,
&self.error_handler,
)
.await;
}
UpdateKind::ChannelPost(post) => {
Self::handle(
post,
&self.channel_post_handlers,
&self.error_handler,
)
.await;
}
UpdateKind::EditedChannelPost(post) => {
Self::handle(
post,
&self.edited_channel_post_handlers,
&self.error_handler,
)
.await;
}
UpdateKind::InlineQuery(query) => {
Self::handle(
query,
&self.inline_query_handlers,
&self.error_handler,
)
.await;
}
UpdateKind::ChosenInlineResult(result) => {
Self::handle(
result,
&self.chosen_inline_result_handlers,
&self.error_handler,
)
.await;
}
UpdateKind::CallbackQuery(callback) => {
Self::handle(
callback,
&self.callback_query_handlers,
&self.error_handler,
)
.await;
}
}
})
.await;
}
async fn handle<T, UpdaterE>(
update: T,
handlers: &FiltersWithHandlers<'a, T, HandlerE>,
error_handler: &Eh,
) where
T: std::fmt::Debug,
Eh: ErrorHandler<Either<UpdaterE, HandlerE>>,
{
for x in handlers {
if x.0.test(&update) {
if let Err(err) = x.1.handle(update).await {
error_handler.handle_error(Either::Right(err)).await
}
return;
}
}
log::warn!("unhandled update {:?}", update);
}
}
#[cfg(test)]
mod tests {
use std::{
convert::Infallible,
sync::atomic::{AtomicI32, Ordering},
};
use crate::{
dispatching::{FilterDispatcher, Updater},
types::{
Chat, ChatKind, ForwardKind, MediaKind, Message, MessageKind,
Sender, Update, UpdateKind, User,
},
};
#[tokio::test]
async fn first_handler_executes_1_time() {
let counter = &AtomicI32::new(0);
let counter2 = &AtomicI32::new(0);
let mut dp = FilterDispatcher::<Infallible, _>::new(|_| async {})
.message_handler(true, |_mes: Message| async move {
counter.fetch_add(1, Ordering::SeqCst);
Ok::<_, Infallible>(())
})
.message_handler(true, |_mes: Message| async move {
counter2.fetch_add(1, Ordering::SeqCst);
Ok::<_, Infallible>(())
});
dp.dispatch(one_message_updater()).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
assert_eq!(counter2.load(Ordering::SeqCst), 0);
}
fn message() -> Message {
Message {
id: 6534,
date: 1_567_898_953,
chat: Chat {
id: 218_485_655,
photo: None,
kind: ChatKind::Private {
type_: (),
first_name: Some("W".to_string()),
last_name: None,
username: Some("WaffleLapkin".to_string()),
},
},
kind: MessageKind::Common {
from: Sender::User(User {
id: 457_569_668,
is_bot: true,
first_name: "BT".to_string(),
last_name: None,
username: Some("BloodyTestBot".to_string()),
language_code: None,
}),
forward_kind: ForwardKind::Origin {
reply_to_message: None,
},
edit_date: None,
media_kind: MediaKind::Text {
text: "text".to_string(),
entities: vec![],
},
reply_markup: None,
},
}
}
fn message_update() -> Update {
Update {
id: 0,
kind: UpdateKind::Message(message()),
}
}
fn one_message_updater() -> impl Updater<Infallible> {
use futures::{future::ready, stream};
stream::once(ready(Ok(message_update())))
}
}

View file

@ -1,3 +1,5 @@
//! Filters of messages.
pub use main::*;
pub use command::*;

View file

@ -1,44 +1,30 @@
use std::{future::Future, pin::Pin};
use futures::FutureExt;
pub type HandlerResult<E> = Result<(), E>;
/// Asynchronous handler for event `T` (like `&self, I -> Future` fn)
pub trait Handler<'a, T, E> {
fn handle(
&self,
/// An asynchronous handler of a value.
pub trait Handler<T, E> {
#[must_use]
fn handle<'a>(
&'a self,
value: T,
) -> Pin<Box<dyn Future<Output = HandlerResult<E>> + 'a>>;
) -> Pin<Box<dyn Future<Output = Result<(), E>> + 'a>>
where
T: 'a;
}
pub trait IntoHandlerResult<E> {
fn into_hr(self) -> HandlerResult<E>;
}
impl<E> IntoHandlerResult<E> for () {
fn into_hr(self) -> HandlerResult<E> {
Ok(())
}
}
impl<E> IntoHandlerResult<E> for HandlerResult<E> {
fn into_hr(self) -> HandlerResult<E> {
self
}
}
impl<'a, F, Fut, R, T, E> Handler<'a, T, E> for F
/// The implementation of `Handler` for `Fn(U) -> Future<Output = Result<(),
/// E>>`.
impl<T, E, F, Fut> Handler<T, E> for F
where
F: Fn(T) -> Fut,
Fut: Future<Output = R> + 'a,
R: IntoHandlerResult<E> + 'a,
E: 'a,
Fut: Future<Output = Result<(), E>>,
{
fn handle(
&self,
fn handle<'a>(
&'a self,
value: T,
) -> Pin<Box<dyn Future<Output = HandlerResult<E>> + 'a>> {
Box::pin(self(value).map(IntoHandlerResult::into_hr))
) -> Pin<Box<dyn Future<Output = Fut::Output> + 'a>>
where
T: 'a,
{
Box::pin(async move { self(value).await })
}
}

View file

@ -1,15 +1,13 @@
//! Update dispatching.
use async_trait::async_trait;
pub mod error_handlers;
mod filter_dp;
pub mod filters;
mod handler;
pub mod updaters;
pub use error_handlers::ErrorHandler;
pub use filter_dp::FilterDispatcher;
pub use filters::Filter;
pub use handler::Handler;
pub mod dispatchers;
pub mod filters;
pub mod handler;
pub mod updater;
#[async_trait(? Send)]
pub trait Dispatcher<'a, U> {
async fn dispatch(&'a mut self, updater: U);
}
pub use updaters::Updater;

View file

@ -1,159 +0,0 @@
use std::{
pin::Pin,
task::{Context, Poll},
};
use futures::{stream, Stream, StreamExt};
use pin_project::pin_project;
use crate::{bot::Bot, types::Update, RequestError};
// Currently just a placeholder, but I'll add here some methods
/// Updater is stream of updates.
///
/// Telegram supports 2 ways of [getting updates]: [long polling](Long Polling)
/// and webhook
///
/// ## Long Polling
///
/// In long polling ([wiki]) you just call [GetUpdates] every N seconds.
///
/// #### Example:
///
/// <pre>
/// tg bot
/// | |
/// |<---------------------------| Updates? (GetUpdates call)
/// ↑ ↑
/// | timeout<a id="1b" href="#1">^1</a> |
/// ↓ ↓
/// Nope |--------------------------->|
/// ↑ ↑
/// | delay between GetUpdates<a id="2b" href="#2">^2</a> |
/// ↓ ↓
/// |<---------------------------| Updates?
/// ↑ ↑
/// | timeout<a id="3b" href="#3">^3</a> |
/// ↓ ↓
/// Yes |-------[updates 0, 1]------>|
/// ↑ ↑
/// | delay |
/// ↓ ↓
/// |<-------[offset = 1]--------| Updates?<a id="4b" href="#4">^4</a>
/// ↑ ↑
/// | timeout |
/// ↓ ↓
/// Yes |---------[update 2]-------->|
/// ↑ ↑
/// | delay |
/// ↓ ↓
/// |<-------[offset = 2]--------| Updates?
/// ↑ ↑
/// | timeout |
/// ↓ ↓
/// Nope |--------------------------->|
/// ↑ ↑
/// | delay |
/// ↓ ↓
/// |<-------[offset = 2]--------| Updates?
/// ↑ ↑
/// | timeout |
/// ↓ ↓
/// Nope |--------------------------->|
/// ↑ ↑
/// | delay |
/// ↓ ↓
/// |<-------[offset = 2]--------| Updates?
/// ↑ ↑
/// | timeout |
/// ↓ ↓
/// Yes |-------[updates 2..5]------>|
/// ↑ ↑
/// | delay |
/// ↓ ↓
/// |<-------[offset = 5]--------| Updates?
/// ↑ ↑
/// | timeout |
/// ↓ ↓
/// Nope |--------------------------->|
/// | |
/// ~ and so on, and so on ~
/// </pre>
///
/// <a id="1" href="#1b">^1</a> Timeout can be even 0
/// (this is also called short polling),
/// but you should use it **only** for testing purposes
///
/// <a id="2" href="#2b">^2</a> Large delays will cause in bot lags,
/// so delay shouldn't exceed second.
///
/// <a id="3" href="#3b">^3</a> Note that if telegram already have updates for
/// you it will answer you **without** waiting for a timeout
///
/// <a id="4" href="#4b">^4</a> `offset = N` means that we've already received
/// updates `0..=N`
///
/// [GetUpdates]: crate::requests::payloads::GetUpdates
/// [getting updates]: https://core.telegram.org/bots/api#getting-updates
/// [wiki]: https://en.wikipedia.org/wiki/Push_technology#Long_polling
pub trait Updater:
Stream<Item = Result<Update, <Self as Updater>::Error>>
{
type Error;
}
#[pin_project]
pub struct StreamUpdater<S> {
#[pin]
stream: S,
}
impl<S> StreamUpdater<S> {
pub fn new(stream: S) -> Self {
Self { stream }
}
}
impl<S, E> Stream for StreamUpdater<S>
where
S: Stream<Item = Result<Update, E>>,
{
type Item = Result<Update, E>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.project().stream.poll_next(cx)
}
}
impl<S, E> Updater for StreamUpdater<S>
where
S: Stream<Item = Result<Update, E>>,
{
type Error = E;
}
pub fn polling<'a>(bot: &'a Bot) -> impl Updater<Error = RequestError> + 'a {
let stream = stream::unfold((bot, 0), |(bot, mut offset)| async move {
let updates = match bot.get_updates().offset(offset).send().await {
Ok(updates) => {
if let Some(upd) = updates.last() {
offset = upd.id + 1;
}
updates.into_iter().map(Ok).collect::<Vec<_>>()
}
Err(err) => vec![Err(err)],
};
Some((stream::iter(updates), (bot, offset)))
})
.flatten();
StreamUpdater { stream }
}
// TODO implement webhook (this actually require webserver and probably we
// should add cargo feature that adds webhook)
//pub fn webhook<'a>(bot: &'a cfg: WebhookConfig) -> Updater<impl
// Stream<Item=Result<Update, ???>> + 'a> {}

171
src/dispatching/updaters.rs Normal file
View file

@ -0,0 +1,171 @@
//! Receiving updates from Telegram.
//!
//! The key trait here is [`Updater`]. You can get it by these functions:
//!
//! - [`polling_default`], which returns a default long polling updater.
//! - [`polling`], which returns a long/short polling updater with your
//! configuration.
//!
//! And then you can pass it directly to a dispatcher.
//!
//! Telegram supports two ways of [getting updates]: [long]/[short] polling and
//! [webhook].
//!
//! # Long Polling
//!
//! In long polling, you just call [`Box::get_updates`] every N seconds.
//!
//! ## Example
//!
//! <pre>
//! tg bot
//! | |
//! |<---------------------------| Updates? (Bot::get_updates call)
//! ↑ ↑
//! | timeout<a id="1b" href="#1">^1</a> |
//! ↓ ↓
//! Nope |--------------------------->|
//! ↑ ↑
//! | delay between Bot::get_updates<a id="2b" href="#2">^2</a> |
//! ↓ ↓
//! |<---------------------------| Updates?
//! ↑ ↑
//! | timeout<a id="3b" href="#3">^3</a> |
//! ↓ ↓
//! Yes |-------[updates 0, 1]------>|
//! ↑ ↑
//! | delay |
//! ↓ ↓
//! |<-------[offset = 1]--------| Updates?<a id="4b" href="#4">^4</a>
//! ↑ ↑
//! | timeout |
//! ↓ ↓
//! Yes |---------[update 2]-------->|
//! ↑ ↑
//! | delay |
//! ↓ ↓
//! |<-------[offset = 2]--------| Updates?
//! ↑ ↑
//! | timeout |
//! ↓ ↓
//! Nope |--------------------------->|
//! ↑ ↑
//! | delay |
//! ↓ ↓
//! |<-------[offset = 2]--------| Updates?
//! ↑ ↑
//! | timeout |
//! ↓ ↓
//! Nope |--------------------------->|
//! ↑ ↑
//! | delay |
//! ↓ ↓
//! |<-------[offset = 2]--------| Updates?
//! ↑ ↑
//! | timeout |
//! ↓ ↓
//! Yes |-------[updates 2..5]------>|
//! ↑ ↑
//! | delay |
//! ↓ ↓
//! |<-------[offset = 5]--------| Updates?
//! ↑ ↑
//! | timeout |
//! ↓ ↓
//! Nope |--------------------------->|
//! | |
//! ~ and so on, and so on ~
//! </pre>
//!
//! <a id="1" href="#1b">^1</a> A timeout can be even 0
//! (this is also called short polling),
//! but you should use it **only** for testing purposes.
//!
//! <a id="2" href="#2b">^2</a> Large delays will cause in bot lags,
//! so delay shouldn't exceed second.
//!
//! <a id="3" href="#3b">^3</a> Note that if Telegram already have updates for
//! you it will answer you **without** waiting for a timeout.
//!
//! <a id="4" href="#4b">^4</a> `offset = N` means that we've already received
//! updates `0..=N`.
//!
//! [`Updater`]: Updater
//! [`polling_default`]: polling_default
//! [`polling`]: polling
//! [`Dispatcher`]: crate::dispatching::Dispatcher::dispatch
//! [`Box::get_updates`]: crate::Bot::get_updates
//! [getting updates]: https://core.telegram.org/bots/api#getting-updates
//! [long]: https://en.wikipedia.org/wiki/Push_technology#Long_polling
//! [short]: https://en.wikipedia.org/wiki/Polling_(computer_science)
//! [webhook]: https://en.wikipedia.org/wiki/Webhook
use futures::{stream, Stream, StreamExt};
use crate::{
bot::Bot, requests::payloads::AllowedUpdate, types::Update, RequestError,
};
use std::{convert::TryInto, time::Duration};
/// A generic updater.
pub trait Updater<E>: Stream<Item = Result<Update, E>> {
// TODO: add some methods here (.shutdown(), etc).
}
impl<S, E> Updater<E> for S where S: Stream<Item = Result<Update, E>> {}
/// Returns a long polling updater with the default configuration.
///
/// See also: [`polling`](polling).
pub fn polling_default(bot: &Bot) -> impl Updater<RequestError> + '_ {
polling(bot, None, None, None)
}
/// Returns a long/short polling updater with some additional options.
///
/// - `bot`: Using this bot, the returned updater will receive updates.
/// - `timeout`: A timeout for polling.
/// - `limit`: Limits the number of updates to be retrieved at once. Values
/// between 1—100 are accepted.
/// - `allowed_updates`: A list the types of updates you want to receive.
/// See [`GetUpdates`] for defaults.
///
/// See also: [`polling_default`](polling_default).
///
/// [`GetUpdates`]: crate::requests::payloads::GetUpdates
pub fn polling(
bot: &Bot,
timeout: Option<Duration>,
limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>,
) -> impl Updater<RequestError> + '_ {
let timeout =
timeout.map(|t| t.as_secs().try_into().expect("timeout is too big"));
stream::unfold(
(allowed_updates, bot, 0),
move |(mut allowed_updates, bot, mut offset)| async move {
let mut req = bot.get_updates().offset(offset);
req.payload.timeout = timeout;
req.payload.limit = limit;
req.payload.allowed_updates = allowed_updates.take();
let updates = match req.send().await {
Err(err) => vec![Err(err)],
Ok(updates) => {
if let Some(upd) = updates.last() {
offset = upd.id + 1;
}
updates.into_iter().map(Ok).collect::<Vec<_>>()
}
};
Some((stream::iter(updates), (allowed_updates, bot, offset)))
},
)
.flatten()
}
// TODO implement webhook (this actually require webserver and probably we
// should add cargo feature that adds webhook)
//pub fn webhook<'a>(bot: &'a cfg: WebhookConfig) -> Updater<impl
// Stream<Item=Result<Update, ???>> + 'a> {}