Make handlers accept streams

This commit is contained in:
Temirkhan Myrzamadi 2020-02-18 04:19:16 +06:00
parent fce8ba302d
commit 9eda683fc5
17 changed files with 678 additions and 682 deletions

View file

@ -18,11 +18,12 @@ maintenance = { status = "actively-developed" }
serde_json = "1.0.44" serde_json = "1.0.44"
serde = { version = "1.0.101", features = ["derive"] } serde = { version = "1.0.101", features = ["derive"] }
tokio = { version = "0.2.6", features = ["full"] } tokio = { version = "0.2.11", features = ["full"] }
tokio-util = { version = "0.2.0", features = ["full"] } tokio-util = { version = "0.2.0", features = ["full"] }
reqwest = { version = "0.10", features = ["json", "stream", "native-tls-vendored"] } reqwest = { version = "0.10", features = ["json", "stream", "native-tls-vendored"] }
log = "0.4.8" log = "0.4.8"
lockfree = "0.5.1"
bytes = "0.5.3" bytes = "0.5.3"
mime = "0.3.16" mime = "0.3.16"
@ -40,3 +41,4 @@ teloxide-macros = { path = "teloxide-macros" }
smart-default = "0.6.0" smart-default = "0.6.0"
rand = "0.7.3" rand = "0.7.3"
pretty_env_logger = "0.4.0" pretty_env_logger = "0.4.0"
lazy_static = "1.4.0"

View file

@ -11,12 +11,11 @@ async fn run() {
let bot = Bot::from_env(); let bot = Bot::from_env();
// Create a dispatcher with a single message handler that answers "pong" to Dispatcher::new(bot)
// each incoming message. .messages_handler(|messages: DispatcherHandlerRx<Message>| {
Dispatcher::<RequestError>::new(bot) messages.for_each_concurrent(None, |message| async move {
.message_handler(&|ctx: DispatcherHandlerCtx<Message>| async move { message.answer("pong").send().await;
ctx.answer("pong").send().await?; })
Ok(())
}) })
.dispatch() .dispatch()
.await; .await;

View file

@ -1,31 +0,0 @@
use std::{future::Future, pin::Pin};
/// An asynchronous handler of a context.
///
/// See [the module-level documentation for the design
/// overview](crate::dispatching).
pub trait CtxHandler<Ctx, Output> {
#[must_use]
fn handle_ctx<'a>(
&'a self,
ctx: Ctx,
) -> Pin<Box<dyn Future<Output = Output> + 'a>>
where
Ctx: 'a;
}
impl<Ctx, Output, F, Fut> CtxHandler<Ctx, Output> for F
where
F: Fn(Ctx) -> Fut,
Fut: Future<Output = Output>,
{
fn handle_ctx<'a>(
&'a self,
ctx: Ctx,
) -> Pin<Box<dyn Future<Output = Fut::Output> + 'a>>
where
Ctx: 'a,
{
Box::pin(async move { self(ctx).await })
}
}

View file

@ -1,97 +1,319 @@
use crate::dispatching::{ use crate::dispatching::{
dialogue::{ dialogue::{
DialogueHandlerCtx, DialogueStage, GetChatId, InMemStorage, Storage, DialogueDispatcherHandler, DialogueDispatcherHandlerCtx, DialogueStage,
GetChatId, InMemStorage, Storage,
}, },
CtxHandler, DispatcherHandlerCtx, DispatcherHandler, DispatcherHandlerCtx,
}; };
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
use futures::StreamExt;
use tokio::sync::mpsc;
use lockfree::map::Map;
use std::sync::Arc;
/// A dispatcher of dialogues. /// A dispatcher of dialogues.
/// ///
/// Note that `DialogueDispatcher` implements `CtxHandler`, so you can just put /// Note that `DialogueDispatcher` implements [`DispatcherHandler`], so you can
/// an instance of this dispatcher into the [`Dispatcher`]'s methods. /// just put an instance of this dispatcher into the [`Dispatcher`]'s methods.
///
/// See [the module-level documentation for the design
/// overview](crate::dispatching::dialogue).
/// ///
/// [`Dispatcher`]: crate::dispatching::Dispatcher /// [`Dispatcher`]: crate::dispatching::Dispatcher
pub struct DialogueDispatcher<'a, D, H> { /// [`DispatcherHandler`]: crate::dispatching::DispatcherHandler
storage: Box<dyn Storage<D> + 'a>, pub struct DialogueDispatcher<D, H, Upd> {
handler: H, storage: Arc<dyn Storage<D> + Send + Sync + 'static>,
handler: Arc<H>,
/// A lock-free map to handle updates from the same chat sequentially, but
/// concurrently from different chats.
///
/// A value is the TX part of an unbounded asynchronous MPSC channel. A
/// handler that executes updates from the same chat ID sequentially
/// handles the RX part.
senders: Arc<Map<i64, mpsc::UnboundedSender<DispatcherHandlerCtx<Upd>>>>,
} }
impl<'a, D, H> DialogueDispatcher<'a, D, H> impl<D, H, Upd> DialogueDispatcher<D, H, Upd>
where where
D: Default + 'a, H: DialogueDispatcherHandler<Upd, D> + Send + Sync + 'static,
Upd: GetChatId + Send + Sync + 'static,
D: Default + Send + Sync + 'static,
{ {
/// Creates a dispatcher with the specified `handler` and [`InMemStorage`] /// Creates a dispatcher with the specified `handler` and [`InMemStorage`]
/// (a default storage). /// (a default storage).
/// ///
/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage /// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage
#[must_use] #[must_use]
pub fn new(handler: H) -> Self { pub fn new(handler: H) -> Arc<Self> {
Self { Arc::new(Self {
storage: Box::new(InMemStorage::default()), storage: InMemStorage::new(),
handler, handler: Arc::new(handler),
} senders: Arc::new(Map::new()),
})
} }
/// Creates a dispatcher with the specified `handler` and `storage`. /// Creates a dispatcher with the specified `handler` and `storage`.
#[must_use] #[must_use]
pub fn with_storage<Stg>(handler: H, storage: Stg) -> Self pub fn with_storage<Stg>(handler: H, storage: Arc<Stg>) -> Arc<Self>
where where
Stg: Storage<D> + 'a, Stg: Storage<D> + Sync + Send + 'static,
{ {
Self { Arc::new(Self {
storage: Box::new(storage), storage,
handler, handler: Arc::new(handler),
} senders: Arc::new(Map::new()),
})
}
#[must_use]
fn new_tx(&self) -> mpsc::UnboundedSender<DispatcherHandlerCtx<Upd>> {
let (tx, rx) = mpsc::unbounded_channel();
let storage = Arc::clone(&self.storage);
let handler = Arc::clone(&self.handler);
let senders = Arc::clone(&self.senders);
tokio::spawn(rx.for_each(move |ctx: DispatcherHandlerCtx<Upd>| {
let storage = Arc::clone(&storage);
let handler = Arc::clone(&handler);
let senders = Arc::clone(&senders);
async move {
let chat_id = ctx.update.chat_id();
let dialogue = Arc::clone(&storage)
.remove_dialogue(chat_id)
.await
.unwrap_or_default();
match handler
.handle(DialogueDispatcherHandlerCtx {
bot: ctx.bot,
update: ctx.update,
dialogue,
})
.await
{
DialogueStage::Next(new_dialogue) => {
update_dialogue(
Arc::clone(&storage),
chat_id,
new_dialogue,
)
.await;
}
DialogueStage::Exit => {
// On the next .poll() call, the spawned future will
// return Poll::Ready, because we are dropping the
// sender right here:
senders.remove(&chat_id);
// We already removed a dialogue from `storage` (see
// the beginning of this async block).
}
}
}
}));
tx
} }
} }
impl<'a, D, H, Upd> CtxHandler<DispatcherHandlerCtx<Upd>, Result<(), ()>> async fn update_dialogue<D>(
for DialogueDispatcher<'a, D, H> storage: Arc<dyn Storage<D> + Send + Sync + 'static>,
where chat_id: i64,
H: CtxHandler<DialogueHandlerCtx<Upd, D>, DialogueStage<D>>, new_dialogue: D,
Upd: GetChatId, ) where
D: Default, D: 'static + Send + Sync,
{ {
fn handle_ctx<'b>( if storage
&'b self, .update_dialogue(chat_id, new_dialogue)
ctx: DispatcherHandlerCtx<Upd>, .await
) -> Pin<Box<dyn Future<Output = Result<(), ()>> + 'b>> .is_some()
where
Upd: 'b,
{ {
Box::pin(async move { panic!(
"Oops, you have an bug in your Storage: update_dialogue returns \
Some after remove_dialogue"
);
}
}
impl<D, H, Upd> DispatcherHandler<Upd> for DialogueDispatcher<D, H, Upd>
where
H: DialogueDispatcherHandler<Upd, D> + Send + Sync + 'static,
Upd: GetChatId + Send + Sync + 'static,
D: Default + Send + Sync + 'static,
{
fn handle<'a>(
&'a self,
updates: mpsc::UnboundedReceiver<DispatcherHandlerCtx<Upd>>,
) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'a>>
where
DispatcherHandlerCtx<Upd>: 'a,
{
Box::pin(updates.for_each(move |ctx| {
let chat_id = ctx.update.chat_id(); let chat_id = ctx.update.chat_id();
let dialogue = self match self.senders.get(&chat_id) {
.storage // An old dialogue
.remove_dialogue(chat_id) Some(tx) => {
.await if let Err(_) = tx.1.send(ctx) {
.unwrap_or_default(); panic!(
"We are not dropping a receiver or call .close() \
if let DialogueStage::Next(new_dialogue) = self on it",
.handler );
.handle_ctx(DialogueHandlerCtx { }
bot: ctx.bot, }
update: ctx.update, None => {
dialogue, let tx = self.new_tx();
}) if let Err(_) = tx.send(ctx) {
.await panic!(
{ "We are not dropping a receiver or call .close() \
if self on it",
.storage );
.update_dialogue(chat_id, new_dialogue) }
.await self.senders.insert(chat_id, tx);
.is_some()
{
panic!(
"We previously storage.remove_dialogue() so \
storage.update_dialogue() must return None"
);
} }
} }
Ok(()) async { () }
}) }))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Bot;
use futures::{stream, StreamExt};
use lazy_static::lazy_static;
use tokio::{
sync::{mpsc, Mutex},
time::{delay_for, Duration},
};
#[tokio::test]
async fn updates_from_same_chat_executed_sequentially() {
#[derive(Debug)]
struct MyUpdate {
chat_id: i64,
unique_number: u32,
};
impl MyUpdate {
fn new(chat_id: i64, unique_number: u32) -> Self {
Self {
chat_id,
unique_number,
}
}
}
impl GetChatId for MyUpdate {
fn chat_id(&self) -> i64 {
self.chat_id
}
}
lazy_static! {
static ref SEQ1: Mutex<Vec<u32>> = Mutex::new(Vec::new());
static ref SEQ2: Mutex<Vec<u32>> = Mutex::new(Vec::new());
static ref SEQ3: Mutex<Vec<u32>> = Mutex::new(Vec::new());
}
let dispatcher = DialogueDispatcher::new(
|ctx: DialogueDispatcherHandlerCtx<MyUpdate, ()>| async move {
delay_for(Duration::from_millis(300)).await;
match ctx.update {
MyUpdate {
chat_id: 1,
unique_number,
} => {
SEQ1.lock().await.push(unique_number);
}
MyUpdate {
chat_id: 2,
unique_number,
} => {
SEQ2.lock().await.push(unique_number);
}
MyUpdate {
chat_id: 3,
unique_number,
} => {
SEQ3.lock().await.push(unique_number);
}
_ => unreachable!(),
}
DialogueStage::Next(())
},
);
let updates = stream::iter(
vec![
MyUpdate::new(1, 174),
MyUpdate::new(1, 125),
MyUpdate::new(2, 411),
MyUpdate::new(1, 2),
MyUpdate::new(2, 515),
MyUpdate::new(2, 623),
MyUpdate::new(1, 193),
MyUpdate::new(1, 104),
MyUpdate::new(2, 2222),
MyUpdate::new(2, 737),
MyUpdate::new(3, 72782),
MyUpdate::new(3, 2737),
MyUpdate::new(1, 7),
MyUpdate::new(1, 7778),
MyUpdate::new(3, 5475),
MyUpdate::new(3, 1096),
MyUpdate::new(3, 872),
MyUpdate::new(2, 10),
MyUpdate::new(2, 55456),
MyUpdate::new(3, 5665),
MyUpdate::new(3, 1611),
]
.into_iter()
.map(|update| DispatcherHandlerCtx {
update,
bot: Bot::new("Doesn't matter here"),
})
.collect::<Vec<DispatcherHandlerCtx<MyUpdate>>>(),
);
let (tx, rx) = mpsc::unbounded_channel();
updates
.for_each(move |update| {
let tx = tx.clone();
async move {
if let Err(_) = tx.send(update) {
panic!("tx.send(update) failed");
}
}
})
.await;
dispatcher.handle(rx).await;
// Wait until our futures to be finished.
delay_for(Duration::from_millis(3000)).await;
assert_eq!(*SEQ1.lock().await, vec![174, 125, 2, 193, 104, 7, 7778]);
assert_eq!(
*SEQ2.lock().await,
vec![411, 515, 623, 2222, 737, 10, 55456]
);
assert_eq!(
*SEQ3.lock().await,
vec![72782, 2737, 5475, 1096, 872, 5665, 1611]
);
} }
} }

View file

@ -0,0 +1,36 @@
use std::pin::Pin;
use crate::prelude::{DialogueDispatcherHandlerCtx, DialogueStage};
use std::future::Future;
/// An asynchronous handler of an update used in [`DialogueDispatcher`].
///
/// See [the module-level documentation for the design
/// overview](crate::dispatching::dialogue).
///
/// [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher
pub trait DialogueDispatcherHandler<Upd, D> {
#[must_use]
fn handle<'a>(
&'a self,
ctx: DialogueDispatcherHandlerCtx<Upd, D>,
) -> Pin<Box<dyn Future<Output = DialogueStage<D>> + Send + Sync + 'a>>
where
DialogueDispatcherHandlerCtx<Upd, D>: Send + Sync + 'a;
}
impl<Upd, D, F, Fut> DialogueDispatcherHandler<Upd, D> for F
where
F: Fn(DialogueDispatcherHandlerCtx<Upd, D>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = DialogueStage<D>> + Send + Sync + 'static,
{
fn handle<'a>(
&'a self,
ctx: DialogueDispatcherHandlerCtx<Upd, D>,
) -> Pin<Box<dyn Future<Output = Fut::Output> + Send + Sync + 'a>>
where
DialogueDispatcherHandlerCtx<Upd, D>: Send + Sync + 'a,
{
Box::pin(async move { self(ctx).await })
}
}

View file

@ -13,14 +13,18 @@ use std::sync::Arc;
/// A context of a [`DialogueDispatcher`]'s message handler. /// A context of a [`DialogueDispatcher`]'s message handler.
/// ///
/// See [the module-level documentation for the design
/// overview](crate::dispatching::dialogue).
///
/// [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher /// [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher
pub struct DialogueHandlerCtx<Upd, D> { #[derive(Debug)]
pub struct DialogueDispatcherHandlerCtx<Upd, D> {
pub bot: Arc<Bot>, pub bot: Arc<Bot>,
pub update: Upd, pub update: Upd,
pub dialogue: D, pub dialogue: D,
} }
impl<Upd, D> DialogueHandlerCtx<Upd, D> { impl<Upd, D> DialogueDispatcherHandlerCtx<Upd, D> {
/// Creates a new instance with the provided fields. /// Creates a new instance with the provided fields.
pub fn new(bot: Arc<Bot>, update: Upd, dialogue: D) -> Self { pub fn new(bot: Arc<Bot>, update: Upd, dialogue: D) -> Self {
Self { Self {
@ -35,8 +39,8 @@ impl<Upd, D> DialogueHandlerCtx<Upd, D> {
pub fn with_new_dialogue<Nd>( pub fn with_new_dialogue<Nd>(
self, self,
new_dialogue: Nd, new_dialogue: Nd,
) -> DialogueHandlerCtx<Upd, Nd> { ) -> DialogueDispatcherHandlerCtx<Upd, Nd> {
DialogueHandlerCtx { DialogueDispatcherHandlerCtx {
bot: self.bot, bot: self.bot,
update: self.update, update: self.update,
dialogue: new_dialogue, dialogue: new_dialogue,
@ -44,7 +48,7 @@ impl<Upd, D> DialogueHandlerCtx<Upd, D> {
} }
} }
impl<Upd, D> GetChatId for DialogueHandlerCtx<Upd, D> impl<Upd, D> GetChatId for DialogueDispatcherHandlerCtx<Upd, D>
where where
Upd: GetChatId, Upd: GetChatId,
{ {
@ -53,7 +57,7 @@ where
} }
} }
impl<D> DialogueHandlerCtx<Message, D> { impl<D> DialogueDispatcherHandlerCtx<Message, D> {
pub fn answer<T>(&self, text: T) -> SendMessage pub fn answer<T>(&self, text: T) -> SendMessage
where where
T: Into<String>, T: Into<String>,

View file

@ -1,4 +1,7 @@
/// Continue or terminate a dialogue. /// Continue or terminate a dialogue.
///
/// See [the module-level documentation for the design
/// overview](crate::dispatching::dialogue).
#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)] #[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
pub enum DialogueStage<D> { pub enum DialogueStage<D> {
Next(D), Next(D),
@ -6,11 +9,17 @@ pub enum DialogueStage<D> {
} }
/// A shortcut for `Ok(DialogueStage::Next(dialogue))`. /// A shortcut for `Ok(DialogueStage::Next(dialogue))`.
///
/// See [the module-level documentation for the design
/// overview](crate::dispatching::dialogue).
pub fn next<E, D>(dialogue: D) -> Result<DialogueStage<D>, E> { pub fn next<E, D>(dialogue: D) -> Result<DialogueStage<D>, E> {
Ok(DialogueStage::Next(dialogue)) Ok(DialogueStage::Next(dialogue))
} }
/// A shortcut for `Ok(DialogueStage::Exit)`. /// A shortcut for `Ok(DialogueStage::Exit)`.
///
/// See [the module-level documentation for the design
/// overview](crate::dispatching::dialogue).
pub fn exit<E, D>() -> Result<DialogueStage<D>, E> { pub fn exit<E, D>() -> Result<DialogueStage<D>, E> {
Ok(DialogueStage::Exit) Ok(DialogueStage::Exit)
} }

View file

@ -4,45 +4,54 @@
//! //!
//! 1. Your type `D`, which designates a dialogue state at the current //! 1. Your type `D`, which designates a dialogue state at the current
//! moment. //! moment.
//! 2. [`Storage`], which encapsulates all the dialogues. //! 2. [`Storage<D>`], which encapsulates all the dialogues.
//! 3. Your handler, which receives an update and turns your dialogue into the //! 3. Your handler, which receives an update and turns your dialogue into the
//! next state. //! next state ([`DialogueDispatcherHandlerCtx<YourUpdate, D>`] ->
//! 4. [`DialogueDispatcher`], which encapsulates your handler, [`Storage`], //! [`DialogueStage<D>`]).
//! and implements [`CtxHandler`]. //! 4. [`DialogueDispatcher`], which encapsulates your handler, [`Storage<D>`],
//! and implements [`DispatcherHandler`].
//! //!
//! You supply [`DialogueDispatcher`] into [`Dispatcher`]. Every time //! For example, you supply [`DialogueDispatcher`] into
//! [`Dispatcher`] calls `DialogueDispatcher::handle_ctx(...)`, the following //! [`Dispatcher::messages_handler`]. Every time [`Dispatcher`] sees an incoming
//! steps are executed: //! [`UpdateKind::Message(message)`], `message` is transferred into
//! [`DialogueDispatcher`]. After this, following steps are executed:
//! //!
//! 1. If a storage doesn't contain a dialogue from this chat, supply //! 1. If a storage doesn't contain a dialogue from this chat, supply
//! `D::default()` into you handler, otherwise, supply the saved session //! `D::default()` into you handler, otherwise, supply the saved dialogue
//! from this chat. //! from this chat.
//! 2. If a handler has returned [`DialogueStage::Exit`], remove the session //! 2. If a handler has returned [`DialogueStage::Exit`], remove the dialogue
//! from the storage, otherwise ([`DialogueStage::Next`]) force the storage to //! from the storage, otherwise ([`DialogueStage::Next`]) force the storage to
//! update the session. //! update the dialogue.
//! //!
//! Please, see [examples/dialogue_bot] as an example. //! Please, see [examples/dialogue_bot] as an example.
//! //!
//! [`Storage`]: crate::dispatching::dialogue::Storage //! [`Storage<D>`]: crate::dispatching::dialogue::Storage
//! [`DialogueStage<D>`]: crate::dispatching::dialogue::DialogueStage
//! [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher //! [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher
//! [`DialogueStage::Exit`]: //! [`DialogueStage::Exit`]:
//! crate::dispatching::dialogue::DialogueStage::Exit //! crate::dispatching::dialogue::DialogueStage::Exit
//! [`DialogueStage::Next`]: crate::dispatching::dialogue::DialogueStage::Next //! [`DialogueStage::Next`]: crate::dispatching::dialogue::DialogueStage::Next
//! [`CtxHandler`]: crate::dispatching::CtxHandler //! [`DispatcherHandler`]: crate::dispatching::DispatcherHandler
//! [`Dispatcher`]: crate::dispatching::Dispatcher //! [`Dispatcher`]: crate::dispatching::Dispatcher
//! [`Dispatcher::messages_handler`]:
//! crate::dispatching::Dispatcher::messages_handler
//! [`UpdateKind::Message(message)`]: crate::types::UpdateKind::Message
//! [`DialogueDispatcherHandlerCtx<YourUpdate, D>`]:
//! crate::dispatching::dialogue::DialogueDispatcherHandlerCtx
//! [examples/dialogue_bot]: https://github.com/teloxide/teloxide/tree/master/examples/dialogue_bot //! [examples/dialogue_bot]: https://github.com/teloxide/teloxide/tree/master/examples/dialogue_bot
#![allow(clippy::module_inception)]
#![allow(clippy::type_complexity)] #![allow(clippy::type_complexity)]
mod dialogue_dispatcher; mod dialogue_dispatcher;
mod dialogue_handler_ctx; mod dialogue_dispatcher_handler;
mod dialogue_dispatcher_handler_ctx;
mod dialogue_stage; mod dialogue_stage;
mod get_chat_id; mod get_chat_id;
mod storage; mod storage;
pub use dialogue_dispatcher::DialogueDispatcher; pub use dialogue_dispatcher::DialogueDispatcher;
pub use dialogue_handler_ctx::DialogueHandlerCtx; pub use dialogue_dispatcher_handler::DialogueDispatcherHandler;
pub use dialogue_dispatcher_handler_ctx::DialogueDispatcherHandlerCtx;
pub use dialogue_stage::{exit, next, DialogueStage}; pub use dialogue_stage::{exit, next, DialogueStage};
pub use get_chat_id::GetChatId; pub use get_chat_id::GetChatId;
pub use storage::{InMemStorage, Storage}; pub use storage::{InMemStorage, Storage};

View file

@ -1,7 +1,5 @@
use async_trait::async_trait;
use super::Storage; use super::Storage;
use std::collections::HashMap; use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};
use tokio::sync::Mutex; use tokio::sync::Mutex;
/// A memory storage based on a hash map. Stores all the dialogues directly in /// A memory storage based on a hash map. Stores all the dialogues directly in
@ -11,19 +9,39 @@ use tokio::sync::Mutex;
/// All the dialogues will be lost after you restart your bot. If you need to /// All the dialogues will be lost after you restart your bot. If you need to
/// store them somewhere on a drive, you need to implement a storage /// store them somewhere on a drive, you need to implement a storage
/// communicating with a DB. /// communicating with a DB.
#[derive(Debug, Default)] #[derive(Debug)]
pub struct InMemStorage<D> { pub struct InMemStorage<D> {
map: Mutex<HashMap<i64, D>>, map: Mutex<HashMap<i64, D>>,
} }
#[async_trait(?Send)] impl<S> InMemStorage<S> {
#[async_trait] #[must_use]
impl<D> Storage<D> for InMemStorage<D> { pub fn new() -> Arc<Self> {
async fn remove_dialogue(&self, chat_id: i64) -> Option<D> { Arc::new(Self {
self.map.lock().await.remove(&chat_id) map: Mutex::new(HashMap::new()),
} })
}
async fn update_dialogue(&self, chat_id: i64, dialogue: D) -> Option<D> { }
self.map.lock().await.insert(chat_id, dialogue)
impl<D> Storage<D> for InMemStorage<D> {
fn remove_dialogue(
self: Arc<Self>,
chat_id: i64,
) -> Pin<Box<dyn Future<Output = Option<D>> + Send + Sync + 'static>>
where
D: Send + Sync + 'static,
{
Box::pin(async move { self.map.lock().await.remove(&chat_id) })
}
fn update_dialogue(
self: Arc<Self>,
chat_id: i64,
dialogue: D,
) -> Pin<Box<dyn Future<Output = Option<D>> + Send + Sync + 'static>>
where
D: Send + Sync + 'static,
{
Box::pin(async move { self.map.lock().await.insert(chat_id, dialogue) })
} }
} }

View file

@ -1,7 +1,7 @@
mod in_mem_storage; mod in_mem_storage;
use async_trait::async_trait;
pub use in_mem_storage::InMemStorage; pub use in_mem_storage::InMemStorage;
use std::{future::Future, pin::Pin, sync::Arc};
/// A storage of dialogues. /// A storage of dialogues.
/// ///
@ -11,18 +11,27 @@ pub use in_mem_storage::InMemStorage;
/// For a storage based on a simple hash map, see [`InMemStorage`]. /// For a storage based on a simple hash map, see [`InMemStorage`].
/// ///
/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage /// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage
#[async_trait(?Send)]
#[async_trait]
pub trait Storage<D> { pub trait Storage<D> {
/// Removes a dialogue with the specified `chat_id`. /// Removes a dialogue with the specified `chat_id`.
/// ///
/// Returns `None` if there wasn't such a dialogue, `Some(dialogue)` if a /// Returns `None` if there wasn't such a dialogue, `Some(dialogue)` if a
/// `dialogue` was deleted. /// `dialogue` was deleted.
async fn remove_dialogue(&self, chat_id: i64) -> Option<D>; fn remove_dialogue(
self: Arc<Self>,
chat_id: i64,
) -> Pin<Box<dyn Future<Output = Option<D>> + Send + Sync + 'static>>
where
D: Send + Sync + 'static;
/// Updates a dialogue with the specified `chat_id`. /// Updates a dialogue with the specified `chat_id`.
/// ///
/// Returns `None` if there wasn't such a dialogue, `Some(dialogue)` if a /// Returns `None` if there wasn't such a dialogue, `Some(dialogue)` if a
/// `dialogue` was updated. /// `dialogue` was updated.
async fn update_dialogue(&self, chat_id: i64, dialogue: D) -> Option<D>; fn update_dialogue(
self: Arc<Self>,
chat_id: i64,
dialogue: D,
) -> Pin<Box<dyn Future<Output = Option<D>> + Send + Sync + 'static>>
where
D: Send + Sync + 'static;
} }

View file

@ -1,215 +1,205 @@
use crate::{ use crate::{
dispatching::{ dispatching::{
error_handlers::ErrorHandler, update_listeners, error_handlers::ErrorHandler, update_listeners,
update_listeners::UpdateListener, CtxHandler, DispatcherHandlerCtx, update_listeners::UpdateListener, DispatcherHandler,
DispatcherHandlerResult, LoggingErrorHandler, DispatcherHandlerCtx, LoggingErrorHandler,
}, },
types::{ types::{
CallbackQuery, ChosenInlineResult, InlineQuery, Message, Poll, CallbackQuery, ChosenInlineResult, InlineQuery, Message, Poll,
PollAnswer, PreCheckoutQuery, ShippingQuery, Update, UpdateKind, PollAnswer, PreCheckoutQuery, ShippingQuery, UpdateKind,
}, },
Bot, RequestError, Bot,
}; };
use futures::{stream, StreamExt}; use futures::StreamExt;
use std::{fmt::Debug, future::Future, sync::Arc}; use std::{fmt::Debug, sync::Arc};
use tokio::sync::mpsc;
type Handlers<'a, Upd, HandlerE> = Vec< use tokio::sync::Mutex;
Box<
dyn CtxHandler< type Tx<Upd> = Option<Mutex<mpsc::UnboundedSender<DispatcherHandlerCtx<Upd>>>>;
DispatcherHandlerCtx<Upd>,
DispatcherHandlerResult<Upd, HandlerE>, #[macro_use]
> + 'a, mod macros {
>, /// Pushes an update to a queue.
>; macro_rules! send {
($bot:expr, $tx:expr, $update:expr, $variant:expr) => {
send($bot, $tx, $update, stringify!($variant)).await;
};
}
}
async fn send<'a, Upd>(
bot: &'a Arc<Bot>,
tx: &'a Tx<Upd>,
update: Upd,
variant: &'static str,
) where
Upd: Debug,
{
if let Some(tx) = tx {
if let Err(error) = tx.lock().await.send(DispatcherHandlerCtx {
bot: Arc::clone(&bot),
update,
}) {
log::error!(
"The RX part of the {} channel is closed, but an update is \
received.\nError:{}\n",
variant,
error
);
}
}
}
/// One dispatcher to rule them all. /// One dispatcher to rule them all.
/// ///
/// See [the module-level documentation for the design /// See [the module-level documentation for the design
/// overview](crate::dispatching). /// overview](crate::dispatching).
// HandlerE=RequestError doesn't work now, because of very poor type inference. pub struct Dispatcher {
// See https://github.com/rust-lang/rust/issues/27336 for more details.
pub struct Dispatcher<'a, HandlerE = RequestError> {
bot: Arc<Bot>, bot: Arc<Bot>,
handlers_error_handler: Box<dyn ErrorHandler<HandlerE> + 'a>, messages_queue: Tx<Message>,
edited_messages_queue: Tx<Message>,
update_handlers: Handlers<'a, Update, HandlerE>, channel_posts_queue: Tx<Message>,
message_handlers: Handlers<'a, Message, HandlerE>, edited_channel_posts_queue: Tx<Message>,
edited_message_handlers: Handlers<'a, Message, HandlerE>, inline_queries_queue: Tx<InlineQuery>,
channel_post_handlers: Handlers<'a, Message, HandlerE>, chosen_inline_results_queue: Tx<ChosenInlineResult>,
edited_channel_post_handlers: Handlers<'a, Message, HandlerE>, callback_queries_queue: Tx<CallbackQuery>,
inline_query_handlers: Handlers<'a, InlineQuery, HandlerE>, shipping_queries_queue: Tx<ShippingQuery>,
chosen_inline_result_handlers: Handlers<'a, ChosenInlineResult, HandlerE>, pre_checkout_queries_queue: Tx<PreCheckoutQuery>,
callback_query_handlers: Handlers<'a, CallbackQuery, HandlerE>, polls_queue: Tx<Poll>,
shipping_query_handlers: Handlers<'a, ShippingQuery, HandlerE>, poll_answers_queue: Tx<PollAnswer>,
pre_checkout_query_handlers: Handlers<'a, PreCheckoutQuery, HandlerE>,
poll_handlers: Handlers<'a, Poll, HandlerE>,
poll_answer_handlers: Handlers<'a, PollAnswer, HandlerE>,
} }
impl<'a, HandlerE> Dispatcher<'a, HandlerE> impl Dispatcher {
where /// Constructs a new dispatcher with the specified `bot`.
HandlerE: Debug + 'a,
{
/// Constructs a new dispatcher with this `bot`.
#[must_use] #[must_use]
pub fn new(bot: Arc<Bot>) -> Self { pub fn new(bot: Arc<Bot>) -> Self {
Self { Self {
bot, bot,
handlers_error_handler: Box::new(LoggingErrorHandler::new( messages_queue: None,
"An error from a Dispatcher's handler", edited_messages_queue: None,
)), channel_posts_queue: None,
update_handlers: Vec::new(), edited_channel_posts_queue: None,
message_handlers: Vec::new(), inline_queries_queue: None,
edited_message_handlers: Vec::new(), chosen_inline_results_queue: None,
channel_post_handlers: Vec::new(), callback_queries_queue: None,
edited_channel_post_handlers: Vec::new(), shipping_queries_queue: None,
inline_query_handlers: Vec::new(), pre_checkout_queries_queue: None,
chosen_inline_result_handlers: Vec::new(), polls_queue: None,
callback_query_handlers: Vec::new(), poll_answers_queue: None,
shipping_query_handlers: Vec::new(),
pre_checkout_query_handlers: Vec::new(),
poll_handlers: Vec::new(),
poll_answer_handlers: Vec::new(),
} }
} }
/// Registers a handler of errors, produced by other handlers.
#[must_use] #[must_use]
pub fn handlers_error_handler<T>(mut self, val: T) -> Self fn new_tx<H, Upd>(&self, h: H) -> Tx<Upd>
where where
T: ErrorHandler<HandlerE> + 'a, H: DispatcherHandler<Upd> + Send + Sync + 'static,
Upd: Send + Sync + 'static,
{ {
self.handlers_error_handler = Box::new(val); let (tx, rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
h.handle(rx).await;
});
Some(Mutex::new(tx))
}
#[must_use]
pub fn messages_handler<H, I>(mut self, h: H) -> Self
where
H: DispatcherHandler<Message> + 'static + Send + Sync,
{
self.messages_queue = self.new_tx(h);
self self
} }
#[must_use] #[must_use]
pub fn update_handler<H, I>(mut self, h: &'a H) -> Self pub fn edited_messages_handler<H, I>(mut self, h: H) -> Self
where where
H: CtxHandler<DispatcherHandlerCtx<Update>, I> + 'a, H: DispatcherHandler<Message> + 'static + Send + Sync,
I: Into<DispatcherHandlerResult<Update, HandlerE>> + 'a,
{ {
self.update_handlers = register_handler(self.update_handlers, h); self.edited_messages_queue = self.new_tx(h);
self self
} }
#[must_use] #[must_use]
pub fn message_handler<H, I>(mut self, h: &'a H) -> Self pub fn channel_posts_handler<H, I>(mut self, h: H) -> Self
where where
H: CtxHandler<DispatcherHandlerCtx<Message>, I> + 'a, H: DispatcherHandler<Message> + 'static + Send + Sync,
I: Into<DispatcherHandlerResult<Message, HandlerE>> + 'a,
{ {
self.message_handlers = register_handler(self.message_handlers, h); self.channel_posts_queue = self.new_tx(h);
self self
} }
#[must_use] #[must_use]
pub fn edited_message_handler<H, I>(mut self, h: &'a H) -> Self pub fn edited_channel_posts_handler<H, I>(mut self, h: H) -> Self
where where
H: CtxHandler<DispatcherHandlerCtx<Message>, I> + 'a, H: DispatcherHandler<Message> + 'static + Send + Sync,
I: Into<DispatcherHandlerResult<Message, HandlerE>> + 'a,
{ {
self.edited_message_handlers = self.edited_channel_posts_queue = self.new_tx(h);
register_handler(self.edited_message_handlers, h);
self self
} }
#[must_use] #[must_use]
pub fn channel_post_handler<H, I>(mut self, h: &'a H) -> Self pub fn inline_queries_handler<H, I>(mut self, h: H) -> Self
where where
H: CtxHandler<DispatcherHandlerCtx<Message>, I> + 'a, H: DispatcherHandler<InlineQuery> + 'static + Send + Sync,
I: Into<DispatcherHandlerResult<Message, HandlerE>> + 'a,
{ {
self.channel_post_handlers = self.inline_queries_queue = self.new_tx(h);
register_handler(self.channel_post_handlers, h);
self self
} }
#[must_use] #[must_use]
pub fn edited_channel_post_handler<H, I>(mut self, h: &'a H) -> Self pub fn chosen_inline_results_handler<H, I>(mut self, h: H) -> Self
where where
H: CtxHandler<DispatcherHandlerCtx<Message>, I> + 'a, H: DispatcherHandler<ChosenInlineResult> + 'static + Send + Sync,
I: Into<DispatcherHandlerResult<Message, HandlerE>> + 'a,
{ {
self.edited_channel_post_handlers = self.chosen_inline_results_queue = self.new_tx(h);
register_handler(self.edited_channel_post_handlers, h);
self self
} }
#[must_use] #[must_use]
pub fn inline_query_handler<H, I>(mut self, h: &'a H) -> Self pub fn callback_queries_handler<H, I>(mut self, h: H) -> Self
where where
H: CtxHandler<DispatcherHandlerCtx<InlineQuery>, I> + 'a, H: DispatcherHandler<CallbackQuery> + 'static + Send + Sync,
I: Into<DispatcherHandlerResult<InlineQuery, HandlerE>> + 'a,
{ {
self.inline_query_handlers = self.callback_queries_queue = self.new_tx(h);
register_handler(self.inline_query_handlers, h);
self self
} }
#[must_use] #[must_use]
pub fn chosen_inline_result_handler<H, I>(mut self, h: &'a H) -> Self pub fn shipping_queries_handler<H, I>(mut self, h: H) -> Self
where where
H: CtxHandler<DispatcherHandlerCtx<ChosenInlineResult>, I> + 'a, H: DispatcherHandler<ShippingQuery> + 'static + Send + Sync,
I: Into<DispatcherHandlerResult<ChosenInlineResult, HandlerE>> + 'a,
{ {
self.chosen_inline_result_handlers = self.shipping_queries_queue = self.new_tx(h);
register_handler(self.chosen_inline_result_handlers, h);
self self
} }
#[must_use] #[must_use]
pub fn callback_query_handler<H, I>(mut self, h: &'a H) -> Self pub fn pre_checkout_queries_handler<H, I>(mut self, h: H) -> Self
where where
H: CtxHandler<DispatcherHandlerCtx<CallbackQuery>, I> + 'a, H: DispatcherHandler<PreCheckoutQuery> + 'static + Send + Sync,
I: Into<DispatcherHandlerResult<CallbackQuery, HandlerE>> + 'a,
{ {
self.callback_query_handlers = self.pre_checkout_queries_queue = self.new_tx(h);
register_handler(self.callback_query_handlers, h);
self self
} }
#[must_use] #[must_use]
pub fn shipping_query_handler<H, I>(mut self, h: &'a H) -> Self pub fn polls_handler<H, I>(mut self, h: H) -> Self
where where
H: CtxHandler<DispatcherHandlerCtx<ShippingQuery>, I> + 'a, H: DispatcherHandler<Poll> + 'static + Send + Sync,
I: Into<DispatcherHandlerResult<ShippingQuery, HandlerE>> + 'a,
{ {
self.shipping_query_handlers = self.polls_queue = self.new_tx(h);
register_handler(self.shipping_query_handlers, h);
self self
} }
#[must_use] #[must_use]
pub fn pre_checkout_query_handler<H, I>(mut self, h: &'a H) -> Self pub fn poll_answers_handler<H, I>(mut self, h: H) -> Self
where where
H: CtxHandler<DispatcherHandlerCtx<PreCheckoutQuery>, I> + 'a, H: DispatcherHandler<PollAnswer> + 'static + Send + Sync,
I: Into<DispatcherHandlerResult<PreCheckoutQuery, HandlerE>> + 'a,
{ {
self.pre_checkout_query_handlers = self.poll_answers_queue = self.new_tx(h);
register_handler(self.pre_checkout_query_handlers, h);
self
}
#[must_use]
pub fn poll_handler<H, I>(mut self, h: &'a H) -> Self
where
H: CtxHandler<DispatcherHandlerCtx<Poll>, I> + 'a,
I: Into<DispatcherHandlerResult<Poll, HandlerE>> + 'a,
{
self.poll_handlers = register_handler(self.poll_handlers, h);
self
}
#[must_use]
pub fn poll_answer_handler<H, I>(mut self, h: &'a H) -> Self
where
H: CtxHandler<DispatcherHandlerCtx<PollAnswer>, I> + 'a,
I: Into<DispatcherHandlerResult<PollAnswer, HandlerE>> + 'a,
{
self.poll_answer_handlers =
register_handler(self.poll_answer_handlers, h);
self self
} }
@ -217,7 +207,7 @@ where
/// ///
/// The default parameters are a long polling update listener and log all /// The default parameters are a long polling update listener and log all
/// errors produced by this listener). /// errors produced by this listener).
pub async fn dispatch(&'a self) { pub async fn dispatch(&self) {
self.dispatch_with_listener( self.dispatch_with_listener(
update_listeners::polling_default(Arc::clone(&self.bot)), update_listeners::polling_default(Arc::clone(&self.bot)),
&LoggingErrorHandler::new("An error from the update listener"), &LoggingErrorHandler::new("An error from the update listener"),
@ -227,7 +217,7 @@ where
/// Starts your bot with custom `update_listener` and /// Starts your bot with custom `update_listener` and
/// `update_listener_error_handler`. /// `update_listener_error_handler`.
pub async fn dispatch_with_listener<UListener, ListenerE, Eh>( pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>(
&'a self, &'a self,
update_listener: UListener, update_listener: UListener,
update_listener_error_handler: &'a Eh, update_listener_error_handler: &'a Eh,
@ -239,7 +229,7 @@ where
let update_listener = Box::pin(update_listener); let update_listener = Box::pin(update_listener);
update_listener update_listener
.for_each_concurrent(None, move |update| async move { .for_each(move |update| async move {
log::trace!("Dispatcher received an update: {:?}", update); log::trace!("Dispatcher received an update: {:?}", update);
let update = match update { let update = match update {
@ -250,132 +240,97 @@ where
} }
}; };
let update =
match self.handle(&self.update_handlers, update).await {
Some(update) => update,
None => return,
};
match update.kind { match update.kind {
UpdateKind::Message(message) => { UpdateKind::Message(message) => {
self.handle(&self.message_handlers, message).await; send!(
&self.bot,
&self.messages_queue,
message,
UpdateKind::Message
);
} }
UpdateKind::EditedMessage(message) => { UpdateKind::EditedMessage(message) => {
self.handle(&self.edited_message_handlers, message) send!(
.await; &self.bot,
&self.edited_messages_queue,
message,
UpdateKind::EditedMessage
);
} }
UpdateKind::ChannelPost(post) => { UpdateKind::ChannelPost(post) => {
self.handle(&self.channel_post_handlers, post).await; send!(
&self.bot,
&self.channel_posts_queue,
post,
UpdateKind::ChannelPost
);
} }
UpdateKind::EditedChannelPost(post) => { UpdateKind::EditedChannelPost(post) => {
self.handle(&self.edited_channel_post_handlers, post) send!(
.await; &self.bot,
&self.edited_channel_posts_queue,
post,
UpdateKind::EditedChannelPost
);
} }
UpdateKind::InlineQuery(query) => { UpdateKind::InlineQuery(query) => {
self.handle(&self.inline_query_handlers, query).await; send!(
&self.bot,
&self.inline_queries_queue,
query,
UpdateKind::InlineQuery
);
} }
UpdateKind::ChosenInlineResult(result) => { UpdateKind::ChosenInlineResult(result) => {
self.handle( send!(
&self.chosen_inline_result_handlers, &self.bot,
&self.chosen_inline_results_queue,
result, result,
) UpdateKind::ChosenInlineResult
.await; );
} }
UpdateKind::CallbackQuery(query) => { UpdateKind::CallbackQuery(query) => {
self.handle(&self.callback_query_handlers, query).await; send!(
&self.bot,
&self.callback_queries_queue,
query,
UpdateKind::CallbackQuer
);
} }
UpdateKind::ShippingQuery(query) => { UpdateKind::ShippingQuery(query) => {
self.handle(&self.shipping_query_handlers, query).await; send!(
&self.bot,
&self.shipping_queries_queue,
query,
UpdateKind::ShippingQuery
);
} }
UpdateKind::PreCheckoutQuery(query) => { UpdateKind::PreCheckoutQuery(query) => {
self.handle(&self.pre_checkout_query_handlers, query) send!(
.await; &self.bot,
&self.pre_checkout_queries_queue,
query,
UpdateKind::PreCheckoutQuery
);
} }
UpdateKind::Poll(poll) => { UpdateKind::Poll(poll) => {
self.handle(&self.poll_handlers, poll).await; send!(
&self.bot,
&self.polls_queue,
poll,
UpdateKind::Poll
);
} }
UpdateKind::PollAnswer(answer) => { UpdateKind::PollAnswer(answer) => {
self.handle(&self.poll_answer_handlers, answer).await; send!(
} &self.bot,
} &self.poll_answers_queue,
}) answer,
.await UpdateKind::PollAnswer
} );
// Handles a single update.
#[allow(clippy::ptr_arg)]
async fn handle<Upd>(
&self,
handlers: &Handlers<'a, Upd, HandlerE>,
update: Upd,
) -> Option<Upd> {
stream::iter(handlers)
.fold(Some(update), |acc, handler| {
async move {
// Option::and_then is not working here, because
// Middleware::handle is asynchronous.
match acc {
Some(update) => {
let DispatcherHandlerResult { next, result } =
handler
.handle_ctx(DispatcherHandlerCtx {
bot: Arc::clone(&self.bot),
update,
})
.await;
if let Err(error) = result {
self.handlers_error_handler
.handle_error(error)
.await
}
next
}
None => None,
} }
} }
}) })
.await .await
} }
} }
/// Transforms Future<Output = T> into Future<Output = U> by applying an Into
/// conversion.
async fn intermediate_fut0<T, U>(fut: impl Future<Output = T>) -> U
where
T: Into<U>,
{
fut.await.into()
}
/// Transforms CtxHandler with Into<DispatcherHandlerResult<...>> as a return
/// value into CtxHandler with DispatcherHandlerResult return value.
fn intermediate_fut1<'a, Upd, HandlerE, H, I>(
h: &'a H,
) -> impl CtxHandler<
DispatcherHandlerCtx<Upd>,
DispatcherHandlerResult<Upd, HandlerE>,
> + 'a
where
H: CtxHandler<DispatcherHandlerCtx<Upd>, I> + 'a,
I: Into<DispatcherHandlerResult<Upd, HandlerE>> + 'a,
Upd: 'a,
{
move |ctx| intermediate_fut0(h.handle_ctx(ctx))
}
/// Registers a single handler.
fn register_handler<'a, Upd, H, I, HandlerE>(
mut handlers: Handlers<'a, Upd, HandlerE>,
h: &'a H,
) -> Handlers<'a, Upd, HandlerE>
where
H: CtxHandler<DispatcherHandlerCtx<Upd>, I> + 'a,
I: Into<DispatcherHandlerResult<Upd, HandlerE>> + 'a,
HandlerE: 'a,
Upd: 'a,
{
handlers.push(Box::new(intermediate_fut1(h)));
handlers
}

View file

@ -0,0 +1,35 @@
use std::{future::Future, pin::Pin};
use crate::dispatching::{DispatcherHandlerCtx, DispatcherHandlerRx};
/// An asynchronous handler of a stream of updates used in [`Dispatcher`].
///
/// See [the module-level documentation for the design
/// overview](crate::dispatching).
///
/// [`Dispatcher`]: crate::dispatching::Dispatcher
pub trait DispatcherHandler<Upd> {
#[must_use]
fn handle<'a>(
&'a self,
updates: DispatcherHandlerRx<Upd>,
) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'a>>
where
DispatcherHandlerCtx<Upd>: Send + Sync + 'a;
}
impl<Upd, F, Fut> DispatcherHandler<Upd> for F
where
F: Fn(DispatcherHandlerRx<Upd>) -> Fut + Send + Sync + Sync + 'static,
Fut: Future<Output = ()> + Send + Sync + 'static,
{
fn handle<'a>(
&'a self,
updates: DispatcherHandlerRx<Upd>,
) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'a>>
where
DispatcherHandlerCtx<Upd>: Send + Sync + 'a,
{
Box::pin(async move { self(updates).await })
}
}

View file

@ -17,6 +17,7 @@ use std::sync::Arc;
/// overview](crate::dispatching). /// overview](crate::dispatching).
/// ///
/// [`Dispatcher`]: crate::dispatching::Dispatcher /// [`Dispatcher`]: crate::dispatching::Dispatcher
#[derive(Debug)]
pub struct DispatcherHandlerCtx<Upd> { pub struct DispatcherHandlerCtx<Upd> {
pub bot: Arc<Bot>, pub bot: Arc<Bot>,
pub update: Upd, pub update: Upd,

View file

@ -1,31 +0,0 @@
/// A result of a handler in [`Dispatcher`].
///
/// See [the module-level documentation for the design
/// overview](crate::dispatching).
///
/// [`Dispatcher`]: crate::dispatching::Dispatcher
pub struct DispatcherHandlerResult<Upd, E> {
pub next: Option<Upd>,
pub result: Result<(), E>,
}
impl<Upd, E> DispatcherHandlerResult<Upd, E> {
/// Creates new `DispatcherHandlerResult` that continues the pipeline.
pub fn next(update: Upd, result: Result<(), E>) -> Self {
Self {
next: Some(update),
result,
}
}
/// Creates new `DispatcherHandlerResult` that terminates the pipeline.
pub fn exit(result: Result<(), E>) -> Self {
Self { next: None, result }
}
}
impl<Upd, E> From<Result<(), E>> for DispatcherHandlerResult<Upd, E> {
fn from(result: Result<(), E>) -> Self {
Self::exit(result)
}
}

View file

@ -1,120 +1,72 @@
//! Updates dispatching. //! Updates dispatching.
//! //!
//! The key type here is [`Dispatcher`]. It encapsulates [`Bot`], handlers for //! The key type here is [`Dispatcher`]. It encapsulates [`Bot`] and handlers
//! [11 update kinds] (+ for [`Update`]) and [`ErrorHandler`] for them. When //! for [the 11 update kinds].
//! [`Update`] is received from Telegram, the following steps are executed:
//! //!
//! 1. It is supplied into an appropriate handler (the first ones is those who //! You can register a maximum of 11 handlers for [the 11 update kinds]. Every
//! accept [`Update`]). //! handler accept [`tokio::sync::mpsc::UnboundedReceiver`] (the RX halve of an
//! 2. If a handler failed, invoke [`ErrorHandler`] with the corresponding //! asynchronous unbounded MPSC channel). Inside a body of your handler, you
//! error. //! typically asynchronously concurrently iterate through updates like this:
//! 3. If a handler has returned [`DispatcherHandlerResult`] with `None`,
//! terminate the pipeline, otherwise supply an update into the next handler
//! (back to step 1).
//! //!
//! The pipeline is executed until either all the registered handlers were //! ```
//! executed, or one of handlers has terminated the pipeline. That's simple! //! use teloxide::prelude::*;
//! //!
//! 1. Note that handlers implement [`CtxHandler`], which means that you are //! async fn handle_messages(rx: DispatcherHandlerRx<Message>) {
//! able to supply [`DialogueDispatcher`] as a handler, since it implements //! rx.for_each_concurrent(None, |message| async move {
//! [`CtxHandler`] too! //! dbg!(message);
//! 2. Note that you don't always need to return [`DispatcherHandlerResult`] //! })
//! explicitly, because of automatic conversions. Just return `Result<(), E>` if //! .await;
//! you want to terminate the pipeline (see the example below). //! }
//! ```
//!
//! When [`Update`] is received from Telegram, [`Dispatcher`] pushes it into an
//! appropriate handler. That's simple!
//!
//! **Note** that handlers must implement [`DispatcherHandler`], which means
//! that:
//! - You are able to supply [`DialogueDispatcher`] as a handler.
//! - You are able to supply functions that accept
//! [`tokio::sync::mpsc::UnboundedReceiver`] and return `Future<Output = ()`
//! as a handler.
//!
//! Since they implement [`DispatcherHandler`] too!
//! //!
//! # Examples //! # Examples
//! ### The ping-pong bot //! ### The ping-pong bot
//! //!
//! ```no_run
//! # #[tokio::main]
//! # async fn main_() {
//! use teloxide::prelude::*;
//!
//! // Setup logging here...
//!
//! // Create a dispatcher with a single message handler that answers "pong"
//! // to each incoming message.
//! Dispatcher::<RequestError>::new(Bot::from_env())
//! .message_handler(&|ctx: DispatcherHandlerCtx<Message>| async move {
//! ctx.answer("pong").send().await?;
//! Ok(())
//! })
//! .dispatch()
//! .await;
//! # }
//! ```
//!
//! [Full](https://github.com/teloxide/teloxide/blob/master/examples/ping_pong_bot/) //! [Full](https://github.com/teloxide/teloxide/blob/master/examples/ping_pong_bot/)
//! //!
//! ### Multiple handlers
//!
//! ```no_run
//! # #[tokio::main]
//! # async fn main_() {
//! use teloxide::prelude::*;
//!
//! // Create a dispatcher with multiple handlers of different types. This will
//! // print One! and Two! on every incoming UpdateKind::Message.
//! Dispatcher::<RequestError>::new(Bot::from_env())
//! // This is the first UpdateKind::Message handler, which will be called
//! // after the Update handler below.
//! .message_handler(&|ctx: DispatcherHandlerCtx<Message>| async move {
//! log::info!("Two!");
//! DispatcherHandlerResult::next(ctx.update, Ok(()))
//! })
//! // Remember: handler of Update are called first.
//! .update_handler(&|ctx: DispatcherHandlerCtx<Update>| async move {
//! log::info!("One!");
//! DispatcherHandlerResult::next(ctx.update, Ok(()))
//! })
//! // This handler will be called right after the first UpdateKind::Message
//! // handler, because it is registered after.
//! .message_handler(&|_ctx: DispatcherHandlerCtx<Message>| async move {
//! // The same as DispatcherHandlerResult::exit(Ok(()))
//! Ok(())
//! })
//! // This handler will never be called, because the UpdateKind::Message
//! // handler above terminates the pipeline.
//! .message_handler(&|ctx: DispatcherHandlerCtx<Message>| async move {
//! log::info!("This will never be printed!");
//! DispatcherHandlerResult::next(ctx.update, Ok(()))
//! })
//! .dispatch()
//! .await;
//!
//! // Note: if this bot receive, for example, UpdateKind::ChannelPost, it will
//! // only print "One!", because the UpdateKind::Message handlers will not be
//! // called.
//! # }
//! ```
//!
//! [Full](https://github.com/teloxide/teloxide/blob/master/examples/miltiple_handlers_bot/)
//!
//! For a bit more complicated example, please see [examples/dialogue_bot]. //! For a bit more complicated example, please see [examples/dialogue_bot].
//! //!
//! [`Dispatcher`]: crate::dispatching::Dispatcher //! [`Dispatcher`]: crate::dispatching::Dispatcher
//! [11 update kinds]: crate::types::UpdateKind //! [the 11 update kinds]: crate::types::UpdateKind
//! [`Update`]: crate::types::Update //! [`Update`]: crate::types::Update
//! [`ErrorHandler`]: crate::dispatching::ErrorHandler //! [`ErrorHandler`]: crate::dispatching::ErrorHandler
//! [`CtxHandler`]: crate::dispatching::CtxHandler //! [`DispatcherHandler`]: crate::dispatching::DispatcherHandler
//! [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher //! [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher
//! [`DispatcherHandlerResult`]: crate::dispatching::DispatcherHandlerResult //! [`DispatcherHandlerResult`]: crate::dispatching::DispatcherHandlerResult
//! [`Bot`]: crate::Bot //! [`Bot`]: crate::Bot
//! [`tokio::sync::mpsc::UnboundedReceiver`]: https://docs.rs/tokio/0.2.11/tokio/sync/mpsc/struct.UnboundedReceiver.html
//! [examples/dialogue_bot]: https://github.com/teloxide/teloxide/tree/master/examples/dialogue_bot //! [examples/dialogue_bot]: https://github.com/teloxide/teloxide/tree/master/examples/dialogue_bot
mod ctx_handlers;
pub mod dialogue; pub mod dialogue;
mod dispatcher; mod dispatcher;
mod dispatcher_handler;
mod dispatcher_handler_ctx; mod dispatcher_handler_ctx;
mod dispatcher_handler_result;
mod error_handlers; mod error_handlers;
pub mod update_listeners; pub mod update_listeners;
pub use ctx_handlers::CtxHandler;
pub use dispatcher::Dispatcher; pub use dispatcher::Dispatcher;
pub use dispatcher_handler::DispatcherHandler;
pub use dispatcher_handler_ctx::DispatcherHandlerCtx; pub use dispatcher_handler_ctx::DispatcherHandlerCtx;
pub use dispatcher_handler_result::DispatcherHandlerResult;
pub use error_handlers::{ pub use error_handlers::{
ErrorHandler, IgnoringErrorHandler, IgnoringErrorHandlerSafe, ErrorHandler, IgnoringErrorHandler, IgnoringErrorHandlerSafe,
LoggingErrorHandler, LoggingErrorHandler,
}; };
use tokio::sync::mpsc::UnboundedReceiver;
/// A type of a stream, consumed by [`Dispatcher`]'s handlers.
///
/// [`Dispatcher`]: crate::dispatching::Dispatcher
pub type DispatcherHandlerRx<Upd> =
UnboundedReceiver<DispatcherHandlerCtx<Upd>>;

View file

@ -45,203 +45,6 @@
//! pretty_env_logger = "0.4.0" //! pretty_env_logger = "0.4.0"
//! ``` //! ```
//! //!
//! ## The ping-pong bot
//! This bot has a single message handler, which answers "pong" to each incoming
//! message:
//!
//! ([Full](https://github.com/teloxide/teloxide/blob/master/examples/ping_pong_bot/src/main.rs))
//! ```rust,no_run
//! use teloxide::prelude::*;
//!
//! # #[tokio::main]
//! # async fn main() {
//! teloxide::enable_logging!();
//! log::info!("Starting the ping-pong bot!");
//!
//! let bot = Bot::from_env();
//!
//! Dispatcher::<RequestError>::new(bot)
//! .message_handler(&|ctx: DispatcherHandlerCtx<Message>| async move {
//! ctx.answer("pong").send().await?;
//! Ok(())
//! })
//! .dispatch()
//! .await;
//! # }
//! ```
//!
//! <div align="center">
//! <img src=https://github.com/teloxide/teloxide/raw/master/media/PING_PONG_BOT.png width="400" />
//! </div>
//!
//! ## Commands
//! Commands are defined similar to how we define CLI using [structopt]. This
//! bot says "I am a cat! Meow!" on `/meow`, generates a random number within
//! [0; 1) on `/generate`, and shows the usage guide on `/help`:
//!
//! ([Full](https://github.com/teloxide/teloxide/blob/master/examples/simple_commands_bot/src/main.rs))
//! ```rust,no_run
//! # use teloxide::{prelude::*, utils::command::BotCommand};
//! # use rand::{thread_rng, Rng};
//! // Imports are omitted...
//!
//! #[derive(BotCommand)]
//! #[command(
//! rename = "lowercase",
//! description = "These commands are supported:"
//! )]
//! enum Command {
//! #[command(description = "display this text.")]
//! Help,
//! #[command(description = "be a cat.")]
//! Meow,
//! #[command(description = "generate a random number within [0; 1).")]
//! Generate,
//! }
//!
//! async fn handle_command(
//! ctx: DispatcherHandlerCtx<Message>,
//! ) -> Result<(), RequestError> {
//! let text = match ctx.update.text() {
//! Some(text) => text,
//! None => {
//! log::info!("Received a message, but not text.");
//! return Ok(());
//! }
//! };
//!
//! let command = match Command::parse(text) {
//! Some((command, _)) => command,
//! None => {
//! log::info!("Received a text message, but not a command.");
//! return Ok(());
//! }
//! };
//!
//! match command {
//! Command::Help => ctx.answer(Command::descriptions()).send().await?,
//! Command::Generate => {
//! ctx.answer(thread_rng().gen_range(0.0, 1.0).to_string())
//! .send()
//! .await?
//! }
//! Command::Meow => ctx.answer("I am a cat! Meow!").send().await?,
//! };
//!
//! Ok(())
//! }
//!
//! #[tokio::main]
//! async fn main() {
//! // Setup is omitted...
//! # teloxide::enable_logging!();
//! # log::info!("Starting simple_commands_bot!");
//! #
//! # let bot = Bot::from_env();
//! #
//! # Dispatcher::<RequestError>::new(bot)
//! # .message_handler(&handle_command)
//! # .dispatch()
//! # .await;
//! }
//! ```
//!
//! <div align="center">
//! <img src=https://github.com/teloxide/teloxide/raw/master/media/SIMPLE_COMMANDS_BOT.png width="400" />
//! </div>
//!
//! ## Guess a number
//! Wanna see more? This is a bot, which starts a game on each incoming message.
//! You must guess a number from 1 to 10 (inclusively):
//!
//! ([Full](https://github.com/teloxide/teloxide/blob/master/examples/guess_a_number_bot/src/main.rs))
//! ```rust,no_run
//! # #[macro_use]
//! # extern crate smart_default;
//! # use teloxide::prelude::*;
//! # use rand::{thread_rng, Rng};
//! // Imports are omitted...
//!
//! #[derive(SmartDefault)]
//! enum Dialogue {
//! #[default]
//! Start,
//! ReceiveAttempt(u8),
//! }
//! async fn handle_message(
//! ctx: DialogueHandlerCtx<Message, Dialogue>,
//! ) -> Result<DialogueStage<Dialogue>, RequestError> {
//! match ctx.dialogue {
//! Dialogue::Start => {
//! ctx.answer(
//! "Let's play a game! Guess a number from 1 to 10
//! (inclusively).",
//! )
//! .send()
//! .await?;
//! next(Dialogue::ReceiveAttempt(thread_rng().gen_range(1, 11)))
//! }
//! Dialogue::ReceiveAttempt(secret) => match ctx.update.text() {
//! None => {
//! ctx.answer("Oh, please, send me a text message!")
//! .send()
//! .await?;
//! next(ctx.dialogue)
//! }
//! Some(text) => match text.parse::<u8>() {
//! Ok(attempt) => match attempt {
//! x if !(1..=10).contains(&x) => {
//! ctx.answer(
//! "Oh, please, send me a number in the range \
//! [1; 10]!",
//! )
//! .send()
//! .await?;
//! next(ctx.dialogue)
//! }
//! x if x == secret => {
//! ctx.answer("Congratulations! You won!")
//! .send()
//! .await?;
//! exit()
//! }
//! _ => {
//! ctx.answer("No.").send().await?;
//! next(ctx.dialogue)
//! }
//! },
//! Err(_) => {
//! ctx.answer(
//! "Oh, please, send me a number in the range [1; \
//! 10]!",
//! )
//! .send()
//! .await?;
//! next(ctx.dialogue)
//! }
//! },
//! },
//! }
//! }
//!
//! #[tokio::main]
//! async fn main() {
//! # teloxide::enable_logging!();
//! # log::info!("Starting guess_a_number_bot!");
//! # let bot = Bot::from_env();
//! // Setup is omitted...
//!
//! Dispatcher::new(bot)
//! .message_handler(&DialogueDispatcher::new(|ctx| async move {
//! handle_message(ctx)
//! .await
//! .expect("Something wrong with the bot!")
//! }))
//! .dispatch()
//! .await;
//! }
//! ```
//!
//! <div align="center"> //! <div align="center">
//! <img src=https://github.com/teloxide/teloxide/raw/master/media/GUESS_A_NUMBER_BOT.png width="400" /> //! <img src=https://github.com/teloxide/teloxide/raw/master/media/GUESS_A_NUMBER_BOT.png width="400" />
//! </div> //! </div>

View file

@ -3,12 +3,16 @@
pub use crate::{ pub use crate::{
dispatching::{ dispatching::{
dialogue::{ dialogue::{
exit, next, DialogueDispatcher, DialogueHandlerCtx, DialogueStage, exit, next, DialogueDispatcher, DialogueDispatcherHandlerCtx,
GetChatId, DialogueStage, GetChatId,
}, },
Dispatcher, DispatcherHandlerCtx, DispatcherHandlerResult, Dispatcher, DispatcherHandlerCtx, DispatcherHandlerRx,
}, },
requests::{Request, ResponseResult}, requests::{Request, ResponseResult},
types::{Message, Update}, types::{Message, Update},
Bot, RequestError, Bot, RequestError,
}; };
pub use tokio::sync::mpsc::UnboundedReceiver;
pub use futures::StreamExt;