Rework teloxide::dispatching (fails now)

This commit is contained in:
Temirkhan Myrzamadi 2020-01-29 10:47:17 +06:00
parent e2619d778c
commit 84d4e6fb2c
12 changed files with 532 additions and 205 deletions

View file

@ -1397,6 +1397,7 @@ impl Bot {
/// - `ok`: Specify `true` if everything is alright (goods are available,
/// etc.) and the bot is ready to proceed with the order. Use False if
/// there are any problems.
///
/// [`Update`]: crate::types::Update
pub fn answer_pre_checkout_query<P>(
&self,

View file

@ -1,18 +0,0 @@
use serde::{Deserialize, Serialize};
use crate::types::{CallbackQuery, Message};
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ChatUpdate {
pub id: i32,
pub kind: ChatUpdateKind,
}
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum ChatUpdateKind {
Message(Message),
EditedMessage(Message),
CallbackQuery(CallbackQuery),
}

View file

@ -1,106 +0,0 @@
use super::{
super::DispatchResult,
storage::{InMemStorage, Storage},
};
use crate::{
dispatching::{
chat::{ChatUpdate, ChatUpdateKind},
Handler, SessionState,
},
types::{Update, UpdateKind},
};
/// A dispatcher that dispatches updates from chats.
pub struct Dispatcher<'a, Session, H> {
storage: Box<dyn Storage<Session> + 'a>,
handler: H,
}
impl<'a, Session, H> Dispatcher<'a, Session, H>
where
Session: Default + 'a,
H: Handler<Session, ChatUpdate>,
{
/// Creates a dispatcher with the specified `handler` and [`InMemStorage`]
/// (a default storage).
///
/// [`InMemStorage`]: crate::dispatching::private::InMemStorage
pub fn new(handler: H) -> Self {
Self {
storage: Box::new(InMemStorage::default()),
handler,
}
}
/// Creates a dispatcher with the specified `handler` and `storage`.
pub fn with_storage<Stg>(handler: H, storage: Stg) -> Self
where
Stg: Storage<Session> + 'a,
{
Self {
storage: Box::new(storage),
handler,
}
}
/// Dispatches a single `update`.
///
/// ## Returns
/// Returns [`DispatchResult::Handled`] if `update` was supplied to a
/// handler, and [`DispatchResult::Unhandled`] if it was an update not
/// from a chat.
///
/// [`DispatchResult::Handled`]: crate::dispatching::DispatchResult::Handled
/// [`DispatchResult::Unhandled`]:
/// crate::dispatching::DispatchResult::Unhandled
pub async fn dispatch(&mut self, update: Update) -> DispatchResult {
let chat_update = match update.kind {
UpdateKind::Message(msg) => ChatUpdate {
id: update.id,
kind: ChatUpdateKind::Message(msg),
},
UpdateKind::EditedMessage(msg) => ChatUpdate {
id: update.id,
kind: ChatUpdateKind::EditedMessage(msg),
},
UpdateKind::CallbackQuery(query) => ChatUpdate {
id: update.id,
kind: ChatUpdateKind::CallbackQuery(query),
},
_ => return DispatchResult::Unhandled,
};
let chat_id = match &chat_update.kind {
ChatUpdateKind::Message(msg) => msg.chat.id,
ChatUpdateKind::EditedMessage(msg) => msg.chat.id,
ChatUpdateKind::CallbackQuery(query) => match &query.message {
None => return DispatchResult::Unhandled,
Some(msg) => msg.chat.id,
},
};
let session = self
.storage
.remove_session(chat_id)
.await
.unwrap_or_default();
if let SessionState::Continue(session) =
self.handler.handle(session, chat_update).await
{
if self
.storage
.update_session(chat_id, session)
.await
.is_some()
{
panic!(
"We previously storage.remove_session() so \
storage.update_session() must return None"
);
}
}
DispatchResult::Handled
}
}

View file

@ -1,41 +0,0 @@
//! Dispatching updates from chats.
//!
//! There are four main components:
//!
//! 1. Your session type `Session`, which designates a dialogue state at the
//! current moment.
//! 2. [`Storage`] that encapsulates all the sessions.
//! 3. Your handler of type `H: async Fn(Session, Update) ->
//! HandleResult<Session>` that receives an update and turns your session into
//! the next state.
//! 4. [`Dispatcher`], which encapsulates your handler and [`Storage`], and has
//! the [`dispatch(Update) -> DispatchResult`] function.
//!
//! Every time you call [`.dispatch(update)`] on your dispatcher, the following
//! steps are executed:
//!
//! 1. If a supplied update is not from a chat, return
//! [`DispatchResult::Unhandled`].
//! 2. If a storage doesn't contain a session from this chat, supply
//! `Session::default()` into you handler, otherwise, supply the previous
//! session.
//! 3. If a handler has returned [`SessionState::Terminate`], remove the
//! session from a storage, otherwise force the storage to update the session.
//!
//! [`Storage`]: crate::dispatching::private::Storage
//! [`Dispatcher`]: crate::dispatching::private::Dispatcher
//! [`dispatch(Update) -> DispatchResult`]:
//! crate::dispatching::private::Dispatcher::dispatch
//! [`.dispatch(update)`]: crate::dispatching::private::Dispatcher::dispatch
//! [`DispatchResult::Unhandled`]: crate::dispatching::DispatchResult::Unhandled
//! [`SessionState::Terminate`]: crate::dispatching::SessionState::Terminate
// TODO: examples
mod chat_update;
mod dispatcher;
mod storage;
pub use chat_update::*;
pub use dispatcher::*;
pub use storage::*;

View file

@ -0,0 +1,280 @@
use crate::{
dispatching::{
error_handlers,
session::{SessionDispatcher, SessionHandlerCtx, SessionState},
update_listeners,
update_listeners::UpdateListener,
Handler,
},
types::{
CallbackQuery, ChatKind, ChosenInlineResult, InlineQuery, Message,
Poll, PreCheckoutQuery, ShippingQuery, UpdateKind,
},
Bot,
};
use futures::StreamExt;
use std::fmt::Debug;
pub struct BasicHandlerCtx<'a, Upd> {
bot: &'a Bot,
update: Upd,
}
pub struct Dispatcher<'a, Session1, Session2, H1, H2, HandlerE> {
bot: &'a Bot,
handlers_error_handler: Box<dyn Handler<HandlerE, ()>>,
private_message_dp: Option<SessionDispatcher<'a, Session1, H1>>,
private_edited_message_dp: Option<SessionDispatcher<'a, Session2, H2>>,
message_handler: Option<Box<dyn Handler<BasicHandlerCtx<'a, Message>, ()>>>,
edited_message_handler:
Option<Box<dyn Handler<BasicHandlerCtx<'a, Message>, ()>>>,
channel_post_handler:
Option<Box<dyn Handler<BasicHandlerCtx<'a, Message>, ()>>>,
edited_channel_post_handler:
Option<Box<dyn Handler<BasicHandlerCtx<'a, Message>, ()>>>,
inline_query_handler:
Option<Box<dyn Handler<BasicHandlerCtx<'a, InlineQuery>, ()>>>,
chosen_inline_result_handler:
Option<Box<dyn Handler<BasicHandlerCtx<'a, ChosenInlineResult>, ()>>>,
callback_query_handler:
Option<Box<dyn Handler<BasicHandlerCtx<'a, CallbackQuery>, ()>>>,
shipping_query_handler:
Option<Box<dyn Handler<BasicHandlerCtx<'a, ShippingQuery>, ()>>>,
pre_checkout_query_handler:
Option<Box<dyn Handler<BasicHandlerCtx<'a, PreCheckoutQuery>, ()>>>,
poll_handler: Option<Box<dyn Handler<BasicHandlerCtx<'a, Poll>, ()>>>,
}
impl<'a, Session1, Session2, H1, H2, HandlerE>
Dispatcher<'a, Session1, Session2, H1, H2, HandlerE>
where
Session1: Default,
Session2: Default,
H1: Handler<
SessionHandlerCtx<'a, Message, Session1>,
SessionState<Session1>,
>,
H2: Handler<
SessionHandlerCtx<'a, Message, Session2>,
SessionState<Session2>,
>,
HandlerE: Debug,
{
pub fn new(bot: &'a Bot) -> Self {
Self {
bot,
handlers_error_handler: Box::new(error_handlers::Log),
private_message_dp: None,
private_edited_message_dp: None,
message_handler: None,
edited_message_handler: None,
channel_post_handler: None,
edited_channel_post_handler: None,
inline_query_handler: None,
chosen_inline_result_handler: None,
callback_query_handler: None,
shipping_query_handler: None,
pre_checkout_query_handler: None,
poll_handler: None,
}
}
pub fn private_message_dp(
mut self,
dp: SessionDispatcher<'a, Session1, H1>,
) -> Self {
self.private_message_dp = Some(dp);
self
}
async fn dispatch(&'a mut self)
where
Session1: 'a,
Session2: 'a,
H1: 'a,
H2: 'a,
HandlerE: 'a,
{
self.dispatch_with_listener(
update_listeners::polling_default(self.bot),
error_handlers::Log,
)
.await;
}
async fn dispatch_with_listener<UListener, ListenerE, Eh>(
&'a mut self,
update_listener: UListener,
update_listener_error_handler: Eh,
) where
UListener: UpdateListener<ListenerE> + 'a,
Eh: Handler<ListenerE, ()> + 'a,
Session1: 'a,
Session2: 'a,
H1: 'a,
H2: 'a,
HandlerE: 'a,
ListenerE: Debug,
{
update_listener
.for_each_concurrent(None, move |update| async {
let update = match update {
Ok(update) => update,
Err(error) => {
update_listener_error_handler.handle(error).await;
return;
}
};
match update.kind {
UpdateKind::Message(message) => match message.chat.kind {
ChatKind::Private { .. } => {
if let Some(private_message_dp) =
&mut self.private_message_dp
{
private_message_dp
.dispatch(self.bot, message)
.await;
}
}
_ => {
if let Some(message_handler) =
&mut self.message_handler
{
message_handler
.handle(BasicHandlerCtx {
bot: self.bot,
update: message,
})
.await
}
}
},
UpdateKind::EditedMessage(message) => {
match message.chat.kind {
ChatKind::Private { .. } => {
if let Some(private_edited_message_dp) =
&mut self.private_edited_message_dp
{
private_edited_message_dp
.dispatch(self.bot, message)
.await;
}
}
_ => {
if let Some(edited_message_handler) =
&mut self.edited_message_handler
{
edited_message_handler
.handle(BasicHandlerCtx {
bot: self.bot,
update: message,
})
.await
}
}
}
}
UpdateKind::ChannelPost(post) => {
if let Some(channel_post_handler) =
&mut self.channel_post_handler
{
channel_post_handler
.handle(BasicHandlerCtx {
bot: self.bot,
update: post,
})
.await;
}
}
UpdateKind::EditedChannelPost(post) => {
if let Some(edited_channel_post_handler) =
&mut self.edited_channel_post_handler
{
edited_channel_post_handler
.handle(BasicHandlerCtx {
bot: self.bot,
update: post,
})
.await;
}
}
UpdateKind::InlineQuery(query) => {
if let Some(inline_query_handler) =
&mut self.inline_query_handler
{
inline_query_handler
.handle(BasicHandlerCtx {
bot: self.bot,
update: query,
})
.await;
}
}
UpdateKind::ChosenInlineResult(result) => {
if let Some(chosen_inline_result_handler) =
&mut self.chosen_inline_result_handler
{
chosen_inline_result_handler
.handle(BasicHandlerCtx {
bot: self.bot,
update: result,
})
.await;
}
}
UpdateKind::CallbackQuery(query) => {
if let Some(callback_query_handler) =
&mut self.callback_query_handler
{
callback_query_handler
.handle(BasicHandlerCtx {
bot: self.bot,
update: query,
})
.await;
}
}
UpdateKind::ShippingQuery(query) => {
if let Some(shipping_query_handler) =
&mut self.shipping_query_handler
{
shipping_query_handler
.handle(BasicHandlerCtx {
bot: self.bot,
update: query,
})
.await;
}
}
UpdateKind::PreCheckoutQuery(query) => {
if let Some(pre_checkout_query_handler) =
&mut self.pre_checkout_query_handler
{
pre_checkout_query_handler
.handle(BasicHandlerCtx {
bot: self.bot,
update: query,
})
.await;
}
}
UpdateKind::Poll(poll) => {
if let Some(poll_handler) = &mut self.poll_handler {
poll_handler
.handle(BasicHandlerCtx {
bot: self.bot,
update: poll,
})
.await;
}
}
}
})
.await
}
}

View file

@ -0,0 +1,100 @@
use crate::dispatching::Handler;
use std::{convert::Infallible, fmt::Debug, future::Future, pin::Pin};
/// A handler that silently ignores all errors.
///
/// ## Example
/// ```
/// # #[tokio::main]
/// # async fn main_() {
/// use teloxide::dispatching::error_handlers::{ErrorHandler, Ignore};
///
/// Ignore.handle_error(()).await;
/// Ignore.handle_error(404).await;
/// Ignore.handle_error(String::from("error")).await;
/// # }
/// ```
pub struct Ignore;
impl<E> Handler<E, ()> for Ignore {
fn handle<'a>(&'a self, _: E) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
E: 'a,
{
Box::pin(async {})
}
}
/// An error handler that silently ignores all errors that can never happen
/// (e.g.: [`!`] or [`Infallible`]).
///
/// ## Examples
/// ```
/// # #[tokio::main]
/// # async fn main_() {
/// use std::convert::{Infallible, TryInto};
///
/// use teloxide::dispatching::error_handlers::{ErrorHandler, IgnoreSafe};
///
/// let result: Result<String, Infallible> = "str".try_into();
/// match result {
/// Ok(string) => println!("{}", string),
/// Err(inf) => IgnoreSafe.handle_error(inf).await,
/// }
///
/// IgnoreSafe.handle_error(return;).await; // return type of `return` is `!` (aka never)
/// # }
/// ```
///
/// ```compile_fail
/// use teloxide::dispatching::dispatchers::filter::error_policy::{
/// ErrorPolicy, IgnoreSafe,
/// };
///
/// IgnoreSafe.handle_error(0);
/// ```
///
/// [`!`]: https://doc.rust-lang.org/std/primitive.never.html
/// [`Infallible`]: std::convert::Infallible
pub struct IgnoreSafe;
#[allow(unreachable_code)]
impl Handler<Infallible, ()> for IgnoreSafe {
fn handle<'a>(
&'a self,
_: Infallible,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
Infallible: 'a,
{
Box::pin(async {})
}
}
/// An error handler that prints all errors passed into it.
///
/// ## Example
/// ```
/// # #[tokio::main]
/// # async fn main_() {
/// use teloxide::dispatching::error_handlers::{ErrorHandler, Log};
///
/// Log.handle_error(()).await;
/// Log.handle_error(404).await;
/// Log.handle_error(String::from("error")).await;
/// # }
/// ```
pub struct Log;
impl<E> Handler<E, ()> for Log
where
E: Debug,
{
fn handle<'a>(&'a self, error: E) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
E: 'a,
{
log::debug!("error: {:?}", error);
Box::pin(async {})
}
}

View file

@ -1,49 +1,29 @@
use std::{future::Future, pin::Pin};
/// Continue or terminate a user session.
#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
pub enum SessionState<Session> {
Continue(Session),
Terminate,
}
/// A handler of a user session and an update.
///
/// ## Returns
/// Returns [`SessionState::Continue(session)`] if it wants to be called again
/// after a new update, or [`SessionState::Terminate`] if not.
///
/// [`SessionState::Continue(session)`]:
/// crate::dispatching::SessionState::Continue
/// [`SessionState::Terminate`]: crate::dispatching::SessionState::Terminate
pub trait Handler<Session, U> {
/// An asynchronous polymorphic handler of a context.
pub trait Handler<Ctx, Output> {
#[must_use]
fn handle<'a>(
&'a self,
session: Session,
update: U,
) -> Pin<Box<dyn Future<Output = SessionState<Session>> + 'a>>
ctx: Ctx,
) -> Pin<Box<dyn Future<Output = Output> + 'a>>
where
Session: 'a,
U: 'a;
Ctx: 'a;
}
/// The implementation of `Handler` for `Fn(Session, Update) -> Future<Output =
/// SessionState<Session>>`.
impl<Session, U, F, Fut> Handler<Session, U> for F
/// The implementation of `Handler` for `Fn(Ctx) -> Future<Output = Output>`.
impl<Ctx, Output, F, Fut> Handler<Ctx, Output> for F
where
F: Fn(Session, U) -> Fut,
Fut: Future<Output = SessionState<Session>>,
F: Fn(Ctx) -> Fut,
Fut: Future<Output = Output>,
{
fn handle<'a>(
&'a self,
session: Session,
update: U,
ctx: Ctx,
) -> Pin<Box<dyn Future<Output = Fut::Output> + 'a>>
where
Session: 'a,
U: 'a,
Ctx: 'a,
{
Box::pin(async move { self(session, update).await })
Box::pin(async move { self(ctx).await })
}
}

View file

@ -1,14 +1,10 @@
//! Update dispatching.
/// If an update was handled by a dispatcher or not.
#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
pub enum DispatchResult {
Handled,
Unhandled,
}
pub mod chat;
mod dispatcher;
pub mod error_handlers;
mod handler;
pub mod session;
pub mod update_listeners;
pub use dispatcher::*;
pub use handler::*;

View file

@ -0,0 +1,13 @@
use crate::types::Message;
/// Something that has a chat ID.
pub trait GetChatId {
#[must_use]
fn chat_id(&self) -> i64;
}
impl GetChatId for Message {
fn chat_id(&self) -> i64 {
self.chat.id
}
}

View file

@ -0,0 +1,122 @@
//! Dispatching user sessions.
//!
//! There are four main components:
//!
//! 1. Your session type `Session`, which designates a dialogue state at the
//! current moment.
//! 2. [`Storage`] that encapsulates all the sessions.
//! 3. Your handler of type `H: async Fn(Session, Update) ->
//! SessionState<Session>` that receives an update and turns your session into
//! the next state.
//! 4. [`SessionDispatcher`], which encapsulates your handler and
//! [`Storage`], and has the [`dispatch(Bot, Upd)`] function.
//!
//! Every time you call `.dispatch(bot, update)` on your dispatcher, the
//! following steps are executed:
//!
//! 1. If a storage doesn't contain a session from this chat, supply
//! `Session::default()` into you handler, otherwise, supply the previous
//! session.
//! 3. If a handler has returned [`SessionState::Terminate`], remove the
//! session from a storage, otherwise force the storage to update the session.
//!
//! [`Storage`]: crate::dispatching::Storage
//! [`SessionDispatcher`]: crate::dispatching::SessionDispatcher
//! [`dispatch(Bot, Upd)`]:
//! crate::dispatching::SessionDispatcher::dispatch
//! [`SessionState::Terminate`]: crate::dispatching::SessionState::Terminate
// TODO: examples
mod get_chat_id;
mod storage;
use crate::{dispatching::Handler, Bot};
pub use get_chat_id::*;
pub use storage::*;
/// A context of a private message handler.
pub struct SessionHandlerCtx<'a, Upd, Session> {
pub bot: &'a Bot,
pub update: Upd,
pub session: Session,
}
/// Continue or terminate a user session.
#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
pub enum SessionState<Session> {
Continue(Session),
Terminate,
}
/// A dispatcher of user sessions.
pub struct SessionDispatcher<'a, Session, H> {
storage: Box<dyn Storage<Session> + 'a>,
handler: H,
}
impl<'a, Session, H> SessionDispatcher<'a, Session, H>
where
Session: Default + 'a,
{
/// Creates a dispatcher with the specified `handler` and [`InMemStorage`]
/// (a default storage).
///
/// [`InMemStorage`]: crate::dispatching::InMemStorage
#[must_use]
pub fn new(handler: H) -> Self {
Self {
storage: Box::new(InMemStorage::default()),
handler,
}
}
/// Creates a dispatcher with the specified `handler` and `storage`.
#[must_use]
pub fn with_storage<Stg>(handler: H, storage: Stg) -> Self
where
Stg: Storage<Session> + 'a,
{
Self {
storage: Box::new(storage),
handler,
}
}
/// Dispatches a single `message` from a private chat.
pub async fn dispatch<Upd>(&'a mut self, bot: &'a Bot, update: Upd)
where
H: Handler<SessionHandlerCtx<'a, Upd, Session>, SessionState<Session>>,
Upd: GetChatId,
{
let chat_id = update.chat_id();
let session = self
.storage
.remove_session(chat_id)
.await
.unwrap_or_default();
if let SessionState::Continue(new_session) = self
.handler
.handle(SessionHandlerCtx {
bot,
update,
session,
})
.await
{
if self
.storage
.update_session(chat_id, new_session)
.await
.is_some()
{
panic!(
"We previously storage.remove_session() so \
storage.update_session() must return None"
);
}
}
}
}