From 9eda683fc53fce98cf1d21212cfa114fc231dc63 Mon Sep 17 00:00:00 2001 From: Temirkhan Myrzamadi Date: Tue, 18 Feb 2020 04:19:16 +0600 Subject: [PATCH] Make handlers accept streams --- Cargo.toml | 4 +- examples/ping_pong_bot/src/main.rs | 11 +- src/dispatching/ctx_handlers.rs | 31 -- .../dialogue/dialogue_dispatcher.rs | 342 ++++++++++++--- .../dialogue/dialogue_dispatcher_handler.rs | 36 ++ ....rs => dialogue_dispatcher_handler_ctx.rs} | 16 +- src/dispatching/dialogue/dialogue_stage.rs | 9 + src/dispatching/dialogue/mod.rs | 39 +- .../dialogue/storage/in_mem_storage.rs | 44 +- src/dispatching/dialogue/storage/mod.rs | 19 +- src/dispatching/dispatcher.rs | 409 ++++++++---------- src/dispatching/dispatcher_handler.rs | 35 ++ src/dispatching/dispatcher_handler_ctx.rs | 1 + src/dispatching/dispatcher_handler_result.rs | 31 -- src/dispatching/mod.rs | 126 ++---- src/lib.rs | 197 --------- src/prelude.rs | 10 +- 17 files changed, 678 insertions(+), 682 deletions(-) delete mode 100644 src/dispatching/ctx_handlers.rs create mode 100644 src/dispatching/dialogue/dialogue_dispatcher_handler.rs rename src/dispatching/dialogue/{dialogue_handler_ctx.rs => dialogue_dispatcher_handler_ctx.rs} (91%) create mode 100644 src/dispatching/dispatcher_handler.rs delete mode 100644 src/dispatching/dispatcher_handler_result.rs diff --git a/Cargo.toml b/Cargo.toml index 05cefbae..09c57ec5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,11 +18,12 @@ maintenance = { status = "actively-developed" } serde_json = "1.0.44" serde = { version = "1.0.101", features = ["derive"] } -tokio = { version = "0.2.6", features = ["full"] } +tokio = { version = "0.2.11", features = ["full"] } tokio-util = { version = "0.2.0", features = ["full"] } reqwest = { version = "0.10", features = ["json", "stream", "native-tls-vendored"] } log = "0.4.8" +lockfree = "0.5.1" bytes = "0.5.3" mime = "0.3.16" @@ -40,3 +41,4 @@ teloxide-macros = { path = "teloxide-macros" } smart-default = "0.6.0" rand = "0.7.3" pretty_env_logger = "0.4.0" +lazy_static = "1.4.0" \ No newline at end of file diff --git a/examples/ping_pong_bot/src/main.rs b/examples/ping_pong_bot/src/main.rs index e731200c..49480c15 100644 --- a/examples/ping_pong_bot/src/main.rs +++ b/examples/ping_pong_bot/src/main.rs @@ -11,12 +11,11 @@ async fn run() { let bot = Bot::from_env(); - // Create a dispatcher with a single message handler that answers "pong" to - // each incoming message. - Dispatcher::::new(bot) - .message_handler(&|ctx: DispatcherHandlerCtx| async move { - ctx.answer("pong").send().await?; - Ok(()) + Dispatcher::new(bot) + .messages_handler(|messages: DispatcherHandlerRx| { + messages.for_each_concurrent(None, |message| async move { + message.answer("pong").send().await; + }) }) .dispatch() .await; diff --git a/src/dispatching/ctx_handlers.rs b/src/dispatching/ctx_handlers.rs deleted file mode 100644 index ba6e22bc..00000000 --- a/src/dispatching/ctx_handlers.rs +++ /dev/null @@ -1,31 +0,0 @@ -use std::{future::Future, pin::Pin}; - -/// An asynchronous handler of a context. -/// -/// See [the module-level documentation for the design -/// overview](crate::dispatching). -pub trait CtxHandler { - #[must_use] - fn handle_ctx<'a>( - &'a self, - ctx: Ctx, - ) -> Pin + 'a>> - where - Ctx: 'a; -} - -impl CtxHandler for F -where - F: Fn(Ctx) -> Fut, - Fut: Future, -{ - fn handle_ctx<'a>( - &'a self, - ctx: Ctx, - ) -> Pin + 'a>> - where - Ctx: 'a, - { - Box::pin(async move { self(ctx).await }) - } -} diff --git a/src/dispatching/dialogue/dialogue_dispatcher.rs b/src/dispatching/dialogue/dialogue_dispatcher.rs index 4b5fea97..5aa49978 100644 --- a/src/dispatching/dialogue/dialogue_dispatcher.rs +++ b/src/dispatching/dialogue/dialogue_dispatcher.rs @@ -1,97 +1,319 @@ use crate::dispatching::{ dialogue::{ - DialogueHandlerCtx, DialogueStage, GetChatId, InMemStorage, Storage, + DialogueDispatcherHandler, DialogueDispatcherHandlerCtx, DialogueStage, + GetChatId, InMemStorage, Storage, }, - CtxHandler, DispatcherHandlerCtx, + DispatcherHandler, DispatcherHandlerCtx, }; use std::{future::Future, pin::Pin}; +use futures::StreamExt; +use tokio::sync::mpsc; + +use lockfree::map::Map; +use std::sync::Arc; + /// A dispatcher of dialogues. /// -/// Note that `DialogueDispatcher` implements `CtxHandler`, so you can just put -/// an instance of this dispatcher into the [`Dispatcher`]'s methods. +/// Note that `DialogueDispatcher` implements [`DispatcherHandler`], so you can +/// just put an instance of this dispatcher into the [`Dispatcher`]'s methods. +/// +/// See [the module-level documentation for the design +/// overview](crate::dispatching::dialogue). /// /// [`Dispatcher`]: crate::dispatching::Dispatcher -pub struct DialogueDispatcher<'a, D, H> { - storage: Box + 'a>, - handler: H, +/// [`DispatcherHandler`]: crate::dispatching::DispatcherHandler +pub struct DialogueDispatcher { + storage: Arc + Send + Sync + 'static>, + handler: Arc, + + /// A lock-free map to handle updates from the same chat sequentially, but + /// concurrently from different chats. + /// + /// A value is the TX part of an unbounded asynchronous MPSC channel. A + /// handler that executes updates from the same chat ID sequentially + /// handles the RX part. + senders: Arc>>>, } -impl<'a, D, H> DialogueDispatcher<'a, D, H> +impl DialogueDispatcher where - D: Default + 'a, + H: DialogueDispatcherHandler + Send + Sync + 'static, + Upd: GetChatId + Send + Sync + 'static, + D: Default + Send + Sync + 'static, { /// Creates a dispatcher with the specified `handler` and [`InMemStorage`] /// (a default storage). /// /// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage #[must_use] - pub fn new(handler: H) -> Self { - Self { - storage: Box::new(InMemStorage::default()), - handler, - } + pub fn new(handler: H) -> Arc { + Arc::new(Self { + storage: InMemStorage::new(), + handler: Arc::new(handler), + senders: Arc::new(Map::new()), + }) } /// Creates a dispatcher with the specified `handler` and `storage`. #[must_use] - pub fn with_storage(handler: H, storage: Stg) -> Self + pub fn with_storage(handler: H, storage: Arc) -> Arc where - Stg: Storage + 'a, + Stg: Storage + Sync + Send + 'static, { - Self { - storage: Box::new(storage), - handler, - } + Arc::new(Self { + storage, + handler: Arc::new(handler), + senders: Arc::new(Map::new()), + }) + } + + #[must_use] + fn new_tx(&self) -> mpsc::UnboundedSender> { + let (tx, rx) = mpsc::unbounded_channel(); + + let storage = Arc::clone(&self.storage); + let handler = Arc::clone(&self.handler); + let senders = Arc::clone(&self.senders); + + tokio::spawn(rx.for_each(move |ctx: DispatcherHandlerCtx| { + let storage = Arc::clone(&storage); + let handler = Arc::clone(&handler); + let senders = Arc::clone(&senders); + + async move { + let chat_id = ctx.update.chat_id(); + + let dialogue = Arc::clone(&storage) + .remove_dialogue(chat_id) + .await + .unwrap_or_default(); + + match handler + .handle(DialogueDispatcherHandlerCtx { + bot: ctx.bot, + update: ctx.update, + dialogue, + }) + .await + { + DialogueStage::Next(new_dialogue) => { + update_dialogue( + Arc::clone(&storage), + chat_id, + new_dialogue, + ) + .await; + } + DialogueStage::Exit => { + // On the next .poll() call, the spawned future will + // return Poll::Ready, because we are dropping the + // sender right here: + senders.remove(&chat_id); + + // We already removed a dialogue from `storage` (see + // the beginning of this async block). + } + } + } + })); + + tx } } -impl<'a, D, H, Upd> CtxHandler, Result<(), ()>> - for DialogueDispatcher<'a, D, H> -where - H: CtxHandler, DialogueStage>, - Upd: GetChatId, - D: Default, +async fn update_dialogue( + storage: Arc + Send + Sync + 'static>, + chat_id: i64, + new_dialogue: D, +) where + D: 'static + Send + Sync, { - fn handle_ctx<'b>( - &'b self, - ctx: DispatcherHandlerCtx, - ) -> Pin> + 'b>> - where - Upd: 'b, + if storage + .update_dialogue(chat_id, new_dialogue) + .await + .is_some() { - Box::pin(async move { + panic!( + "Oops, you have an bug in your Storage: update_dialogue returns \ + Some after remove_dialogue" + ); + } +} + +impl DispatcherHandler for DialogueDispatcher +where + H: DialogueDispatcherHandler + Send + Sync + 'static, + Upd: GetChatId + Send + Sync + 'static, + D: Default + Send + Sync + 'static, +{ + fn handle<'a>( + &'a self, + updates: mpsc::UnboundedReceiver>, + ) -> Pin + Send + Sync + 'a>> + where + DispatcherHandlerCtx: 'a, + { + Box::pin(updates.for_each(move |ctx| { let chat_id = ctx.update.chat_id(); - let dialogue = self - .storage - .remove_dialogue(chat_id) - .await - .unwrap_or_default(); - - if let DialogueStage::Next(new_dialogue) = self - .handler - .handle_ctx(DialogueHandlerCtx { - bot: ctx.bot, - update: ctx.update, - dialogue, - }) - .await - { - if self - .storage - .update_dialogue(chat_id, new_dialogue) - .await - .is_some() - { - panic!( - "We previously storage.remove_dialogue() so \ - storage.update_dialogue() must return None" - ); + match self.senders.get(&chat_id) { + // An old dialogue + Some(tx) => { + if let Err(_) = tx.1.send(ctx) { + panic!( + "We are not dropping a receiver or call .close() \ + on it", + ); + } + } + None => { + let tx = self.new_tx(); + if let Err(_) = tx.send(ctx) { + panic!( + "We are not dropping a receiver or call .close() \ + on it", + ); + } + self.senders.insert(chat_id, tx); } } - Ok(()) - }) + async { () } + })) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::Bot; + use futures::{stream, StreamExt}; + use lazy_static::lazy_static; + use tokio::{ + sync::{mpsc, Mutex}, + time::{delay_for, Duration}, + }; + + #[tokio::test] + async fn updates_from_same_chat_executed_sequentially() { + #[derive(Debug)] + struct MyUpdate { + chat_id: i64, + unique_number: u32, + }; + + impl MyUpdate { + fn new(chat_id: i64, unique_number: u32) -> Self { + Self { + chat_id, + unique_number, + } + } + } + + impl GetChatId for MyUpdate { + fn chat_id(&self) -> i64 { + self.chat_id + } + } + + lazy_static! { + static ref SEQ1: Mutex> = Mutex::new(Vec::new()); + static ref SEQ2: Mutex> = Mutex::new(Vec::new()); + static ref SEQ3: Mutex> = Mutex::new(Vec::new()); + } + + let dispatcher = DialogueDispatcher::new( + |ctx: DialogueDispatcherHandlerCtx| async move { + delay_for(Duration::from_millis(300)).await; + + match ctx.update { + MyUpdate { + chat_id: 1, + unique_number, + } => { + SEQ1.lock().await.push(unique_number); + } + MyUpdate { + chat_id: 2, + unique_number, + } => { + SEQ2.lock().await.push(unique_number); + } + MyUpdate { + chat_id: 3, + unique_number, + } => { + SEQ3.lock().await.push(unique_number); + } + _ => unreachable!(), + } + + DialogueStage::Next(()) + }, + ); + + let updates = stream::iter( + vec![ + MyUpdate::new(1, 174), + MyUpdate::new(1, 125), + MyUpdate::new(2, 411), + MyUpdate::new(1, 2), + MyUpdate::new(2, 515), + MyUpdate::new(2, 623), + MyUpdate::new(1, 193), + MyUpdate::new(1, 104), + MyUpdate::new(2, 2222), + MyUpdate::new(2, 737), + MyUpdate::new(3, 72782), + MyUpdate::new(3, 2737), + MyUpdate::new(1, 7), + MyUpdate::new(1, 7778), + MyUpdate::new(3, 5475), + MyUpdate::new(3, 1096), + MyUpdate::new(3, 872), + MyUpdate::new(2, 10), + MyUpdate::new(2, 55456), + MyUpdate::new(3, 5665), + MyUpdate::new(3, 1611), + ] + .into_iter() + .map(|update| DispatcherHandlerCtx { + update, + bot: Bot::new("Doesn't matter here"), + }) + .collect::>>(), + ); + + let (tx, rx) = mpsc::unbounded_channel(); + + updates + .for_each(move |update| { + let tx = tx.clone(); + + async move { + if let Err(_) = tx.send(update) { + panic!("tx.send(update) failed"); + } + } + }) + .await; + + dispatcher.handle(rx).await; + + // Wait until our futures to be finished. + delay_for(Duration::from_millis(3000)).await; + + assert_eq!(*SEQ1.lock().await, vec![174, 125, 2, 193, 104, 7, 7778]); + assert_eq!( + *SEQ2.lock().await, + vec![411, 515, 623, 2222, 737, 10, 55456] + ); + assert_eq!( + *SEQ3.lock().await, + vec![72782, 2737, 5475, 1096, 872, 5665, 1611] + ); } } diff --git a/src/dispatching/dialogue/dialogue_dispatcher_handler.rs b/src/dispatching/dialogue/dialogue_dispatcher_handler.rs new file mode 100644 index 00000000..83ef3267 --- /dev/null +++ b/src/dispatching/dialogue/dialogue_dispatcher_handler.rs @@ -0,0 +1,36 @@ +use std::pin::Pin; + +use crate::prelude::{DialogueDispatcherHandlerCtx, DialogueStage}; +use std::future::Future; + +/// An asynchronous handler of an update used in [`DialogueDispatcher`]. +/// +/// See [the module-level documentation for the design +/// overview](crate::dispatching::dialogue). +/// +/// [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher +pub trait DialogueDispatcherHandler { + #[must_use] + fn handle<'a>( + &'a self, + ctx: DialogueDispatcherHandlerCtx, + ) -> Pin> + Send + Sync + 'a>> + where + DialogueDispatcherHandlerCtx: Send + Sync + 'a; +} + +impl DialogueDispatcherHandler for F +where + F: Fn(DialogueDispatcherHandlerCtx) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + Sync + 'static, +{ + fn handle<'a>( + &'a self, + ctx: DialogueDispatcherHandlerCtx, + ) -> Pin + Send + Sync + 'a>> + where + DialogueDispatcherHandlerCtx: Send + Sync + 'a, + { + Box::pin(async move { self(ctx).await }) + } +} diff --git a/src/dispatching/dialogue/dialogue_handler_ctx.rs b/src/dispatching/dialogue/dialogue_dispatcher_handler_ctx.rs similarity index 91% rename from src/dispatching/dialogue/dialogue_handler_ctx.rs rename to src/dispatching/dialogue/dialogue_dispatcher_handler_ctx.rs index 257744aa..d7dc6738 100644 --- a/src/dispatching/dialogue/dialogue_handler_ctx.rs +++ b/src/dispatching/dialogue/dialogue_dispatcher_handler_ctx.rs @@ -13,14 +13,18 @@ use std::sync::Arc; /// A context of a [`DialogueDispatcher`]'s message handler. /// +/// See [the module-level documentation for the design +/// overview](crate::dispatching::dialogue). +/// /// [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher -pub struct DialogueHandlerCtx { +#[derive(Debug)] +pub struct DialogueDispatcherHandlerCtx { pub bot: Arc, pub update: Upd, pub dialogue: D, } -impl DialogueHandlerCtx { +impl DialogueDispatcherHandlerCtx { /// Creates a new instance with the provided fields. pub fn new(bot: Arc, update: Upd, dialogue: D) -> Self { Self { @@ -35,8 +39,8 @@ impl DialogueHandlerCtx { pub fn with_new_dialogue( self, new_dialogue: Nd, - ) -> DialogueHandlerCtx { - DialogueHandlerCtx { + ) -> DialogueDispatcherHandlerCtx { + DialogueDispatcherHandlerCtx { bot: self.bot, update: self.update, dialogue: new_dialogue, @@ -44,7 +48,7 @@ impl DialogueHandlerCtx { } } -impl GetChatId for DialogueHandlerCtx +impl GetChatId for DialogueDispatcherHandlerCtx where Upd: GetChatId, { @@ -53,7 +57,7 @@ where } } -impl DialogueHandlerCtx { +impl DialogueDispatcherHandlerCtx { pub fn answer(&self, text: T) -> SendMessage where T: Into, diff --git a/src/dispatching/dialogue/dialogue_stage.rs b/src/dispatching/dialogue/dialogue_stage.rs index afb5a31c..70989bc9 100644 --- a/src/dispatching/dialogue/dialogue_stage.rs +++ b/src/dispatching/dialogue/dialogue_stage.rs @@ -1,4 +1,7 @@ /// Continue or terminate a dialogue. +/// +/// See [the module-level documentation for the design +/// overview](crate::dispatching::dialogue). #[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)] pub enum DialogueStage { Next(D), @@ -6,11 +9,17 @@ pub enum DialogueStage { } /// A shortcut for `Ok(DialogueStage::Next(dialogue))`. +/// +/// See [the module-level documentation for the design +/// overview](crate::dispatching::dialogue). pub fn next(dialogue: D) -> Result, E> { Ok(DialogueStage::Next(dialogue)) } /// A shortcut for `Ok(DialogueStage::Exit)`. +/// +/// See [the module-level documentation for the design +/// overview](crate::dispatching::dialogue). pub fn exit() -> Result, E> { Ok(DialogueStage::Exit) } diff --git a/src/dispatching/dialogue/mod.rs b/src/dispatching/dialogue/mod.rs index 2788308d..694f3fd3 100644 --- a/src/dispatching/dialogue/mod.rs +++ b/src/dispatching/dialogue/mod.rs @@ -4,45 +4,54 @@ //! //! 1. Your type `D`, which designates a dialogue state at the current //! moment. -//! 2. [`Storage`], which encapsulates all the dialogues. +//! 2. [`Storage`], which encapsulates all the dialogues. //! 3. Your handler, which receives an update and turns your dialogue into the -//! next state. -//! 4. [`DialogueDispatcher`], which encapsulates your handler, [`Storage`], -//! and implements [`CtxHandler`]. +//! next state ([`DialogueDispatcherHandlerCtx`] -> +//! [`DialogueStage`]). +//! 4. [`DialogueDispatcher`], which encapsulates your handler, [`Storage`], +//! and implements [`DispatcherHandler`]. //! -//! You supply [`DialogueDispatcher`] into [`Dispatcher`]. Every time -//! [`Dispatcher`] calls `DialogueDispatcher::handle_ctx(...)`, the following -//! steps are executed: +//! For example, you supply [`DialogueDispatcher`] into +//! [`Dispatcher::messages_handler`]. Every time [`Dispatcher`] sees an incoming +//! [`UpdateKind::Message(message)`], `message` is transferred into +//! [`DialogueDispatcher`]. After this, following steps are executed: //! //! 1. If a storage doesn't contain a dialogue from this chat, supply -//! `D::default()` into you handler, otherwise, supply the saved session +//! `D::default()` into you handler, otherwise, supply the saved dialogue //! from this chat. -//! 2. If a handler has returned [`DialogueStage::Exit`], remove the session +//! 2. If a handler has returned [`DialogueStage::Exit`], remove the dialogue //! from the storage, otherwise ([`DialogueStage::Next`]) force the storage to -//! update the session. +//! update the dialogue. //! //! Please, see [examples/dialogue_bot] as an example. //! -//! [`Storage`]: crate::dispatching::dialogue::Storage +//! [`Storage`]: crate::dispatching::dialogue::Storage +//! [`DialogueStage`]: crate::dispatching::dialogue::DialogueStage //! [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher //! [`DialogueStage::Exit`]: //! crate::dispatching::dialogue::DialogueStage::Exit //! [`DialogueStage::Next`]: crate::dispatching::dialogue::DialogueStage::Next -//! [`CtxHandler`]: crate::dispatching::CtxHandler +//! [`DispatcherHandler`]: crate::dispatching::DispatcherHandler //! [`Dispatcher`]: crate::dispatching::Dispatcher +//! [`Dispatcher::messages_handler`]: +//! crate::dispatching::Dispatcher::messages_handler +//! [`UpdateKind::Message(message)`]: crate::types::UpdateKind::Message +//! [`DialogueDispatcherHandlerCtx`]: +//! crate::dispatching::dialogue::DialogueDispatcherHandlerCtx //! [examples/dialogue_bot]: https://github.com/teloxide/teloxide/tree/master/examples/dialogue_bot -#![allow(clippy::module_inception)] #![allow(clippy::type_complexity)] mod dialogue_dispatcher; -mod dialogue_handler_ctx; +mod dialogue_dispatcher_handler; +mod dialogue_dispatcher_handler_ctx; mod dialogue_stage; mod get_chat_id; mod storage; pub use dialogue_dispatcher::DialogueDispatcher; -pub use dialogue_handler_ctx::DialogueHandlerCtx; +pub use dialogue_dispatcher_handler::DialogueDispatcherHandler; +pub use dialogue_dispatcher_handler_ctx::DialogueDispatcherHandlerCtx; pub use dialogue_stage::{exit, next, DialogueStage}; pub use get_chat_id::GetChatId; pub use storage::{InMemStorage, Storage}; diff --git a/src/dispatching/dialogue/storage/in_mem_storage.rs b/src/dispatching/dialogue/storage/in_mem_storage.rs index bfc1d033..e92a8536 100644 --- a/src/dispatching/dialogue/storage/in_mem_storage.rs +++ b/src/dispatching/dialogue/storage/in_mem_storage.rs @@ -1,7 +1,5 @@ -use async_trait::async_trait; - use super::Storage; -use std::collections::HashMap; +use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc}; use tokio::sync::Mutex; /// A memory storage based on a hash map. Stores all the dialogues directly in @@ -11,19 +9,39 @@ use tokio::sync::Mutex; /// All the dialogues will be lost after you restart your bot. If you need to /// store them somewhere on a drive, you need to implement a storage /// communicating with a DB. -#[derive(Debug, Default)] +#[derive(Debug)] pub struct InMemStorage { map: Mutex>, } -#[async_trait(?Send)] -#[async_trait] -impl Storage for InMemStorage { - async fn remove_dialogue(&self, chat_id: i64) -> Option { - self.map.lock().await.remove(&chat_id) - } - - async fn update_dialogue(&self, chat_id: i64, dialogue: D) -> Option { - self.map.lock().await.insert(chat_id, dialogue) +impl InMemStorage { + #[must_use] + pub fn new() -> Arc { + Arc::new(Self { + map: Mutex::new(HashMap::new()), + }) + } +} + +impl Storage for InMemStorage { + fn remove_dialogue( + self: Arc, + chat_id: i64, + ) -> Pin> + Send + Sync + 'static>> + where + D: Send + Sync + 'static, + { + Box::pin(async move { self.map.lock().await.remove(&chat_id) }) + } + + fn update_dialogue( + self: Arc, + chat_id: i64, + dialogue: D, + ) -> Pin> + Send + Sync + 'static>> + where + D: Send + Sync + 'static, + { + Box::pin(async move { self.map.lock().await.insert(chat_id, dialogue) }) } } diff --git a/src/dispatching/dialogue/storage/mod.rs b/src/dispatching/dialogue/storage/mod.rs index f06fbf49..0aea8a9e 100644 --- a/src/dispatching/dialogue/storage/mod.rs +++ b/src/dispatching/dialogue/storage/mod.rs @@ -1,7 +1,7 @@ mod in_mem_storage; -use async_trait::async_trait; pub use in_mem_storage::InMemStorage; +use std::{future::Future, pin::Pin, sync::Arc}; /// A storage of dialogues. /// @@ -11,18 +11,27 @@ pub use in_mem_storage::InMemStorage; /// For a storage based on a simple hash map, see [`InMemStorage`]. /// /// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage -#[async_trait(?Send)] -#[async_trait] pub trait Storage { /// Removes a dialogue with the specified `chat_id`. /// /// Returns `None` if there wasn't such a dialogue, `Some(dialogue)` if a /// `dialogue` was deleted. - async fn remove_dialogue(&self, chat_id: i64) -> Option; + fn remove_dialogue( + self: Arc, + chat_id: i64, + ) -> Pin> + Send + Sync + 'static>> + where + D: Send + Sync + 'static; /// Updates a dialogue with the specified `chat_id`. /// /// Returns `None` if there wasn't such a dialogue, `Some(dialogue)` if a /// `dialogue` was updated. - async fn update_dialogue(&self, chat_id: i64, dialogue: D) -> Option; + fn update_dialogue( + self: Arc, + chat_id: i64, + dialogue: D, + ) -> Pin> + Send + Sync + 'static>> + where + D: Send + Sync + 'static; } diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 82f16b2d..132ceca1 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -1,215 +1,205 @@ use crate::{ dispatching::{ error_handlers::ErrorHandler, update_listeners, - update_listeners::UpdateListener, CtxHandler, DispatcherHandlerCtx, - DispatcherHandlerResult, LoggingErrorHandler, + update_listeners::UpdateListener, DispatcherHandler, + DispatcherHandlerCtx, LoggingErrorHandler, }, types::{ CallbackQuery, ChosenInlineResult, InlineQuery, Message, Poll, - PollAnswer, PreCheckoutQuery, ShippingQuery, Update, UpdateKind, + PollAnswer, PreCheckoutQuery, ShippingQuery, UpdateKind, }, - Bot, RequestError, + Bot, }; -use futures::{stream, StreamExt}; -use std::{fmt::Debug, future::Future, sync::Arc}; +use futures::StreamExt; +use std::{fmt::Debug, sync::Arc}; +use tokio::sync::mpsc; -type Handlers<'a, Upd, HandlerE> = Vec< - Box< - dyn CtxHandler< - DispatcherHandlerCtx, - DispatcherHandlerResult, - > + 'a, - >, ->; +use tokio::sync::Mutex; + +type Tx = Option>>>; + +#[macro_use] +mod macros { + /// Pushes an update to a queue. + macro_rules! send { + ($bot:expr, $tx:expr, $update:expr, $variant:expr) => { + send($bot, $tx, $update, stringify!($variant)).await; + }; + } +} + +async fn send<'a, Upd>( + bot: &'a Arc, + tx: &'a Tx, + update: Upd, + variant: &'static str, +) where + Upd: Debug, +{ + if let Some(tx) = tx { + if let Err(error) = tx.lock().await.send(DispatcherHandlerCtx { + bot: Arc::clone(&bot), + update, + }) { + log::error!( + "The RX part of the {} channel is closed, but an update is \ + received.\nError:{}\n", + variant, + error + ); + } + } +} /// One dispatcher to rule them all. /// /// See [the module-level documentation for the design /// overview](crate::dispatching). -// HandlerE=RequestError doesn't work now, because of very poor type inference. -// See https://github.com/rust-lang/rust/issues/27336 for more details. -pub struct Dispatcher<'a, HandlerE = RequestError> { +pub struct Dispatcher { bot: Arc, - handlers_error_handler: Box + 'a>, - - update_handlers: Handlers<'a, Update, HandlerE>, - message_handlers: Handlers<'a, Message, HandlerE>, - edited_message_handlers: Handlers<'a, Message, HandlerE>, - channel_post_handlers: Handlers<'a, Message, HandlerE>, - edited_channel_post_handlers: Handlers<'a, Message, HandlerE>, - inline_query_handlers: Handlers<'a, InlineQuery, HandlerE>, - chosen_inline_result_handlers: Handlers<'a, ChosenInlineResult, HandlerE>, - callback_query_handlers: Handlers<'a, CallbackQuery, HandlerE>, - shipping_query_handlers: Handlers<'a, ShippingQuery, HandlerE>, - pre_checkout_query_handlers: Handlers<'a, PreCheckoutQuery, HandlerE>, - poll_handlers: Handlers<'a, Poll, HandlerE>, - poll_answer_handlers: Handlers<'a, PollAnswer, HandlerE>, + messages_queue: Tx, + edited_messages_queue: Tx, + channel_posts_queue: Tx, + edited_channel_posts_queue: Tx, + inline_queries_queue: Tx, + chosen_inline_results_queue: Tx, + callback_queries_queue: Tx, + shipping_queries_queue: Tx, + pre_checkout_queries_queue: Tx, + polls_queue: Tx, + poll_answers_queue: Tx, } -impl<'a, HandlerE> Dispatcher<'a, HandlerE> -where - HandlerE: Debug + 'a, -{ - /// Constructs a new dispatcher with this `bot`. +impl Dispatcher { + /// Constructs a new dispatcher with the specified `bot`. #[must_use] pub fn new(bot: Arc) -> Self { Self { bot, - handlers_error_handler: Box::new(LoggingErrorHandler::new( - "An error from a Dispatcher's handler", - )), - update_handlers: Vec::new(), - message_handlers: Vec::new(), - edited_message_handlers: Vec::new(), - channel_post_handlers: Vec::new(), - edited_channel_post_handlers: Vec::new(), - inline_query_handlers: Vec::new(), - chosen_inline_result_handlers: Vec::new(), - callback_query_handlers: Vec::new(), - shipping_query_handlers: Vec::new(), - pre_checkout_query_handlers: Vec::new(), - poll_handlers: Vec::new(), - poll_answer_handlers: Vec::new(), + messages_queue: None, + edited_messages_queue: None, + channel_posts_queue: None, + edited_channel_posts_queue: None, + inline_queries_queue: None, + chosen_inline_results_queue: None, + callback_queries_queue: None, + shipping_queries_queue: None, + pre_checkout_queries_queue: None, + polls_queue: None, + poll_answers_queue: None, } } - /// Registers a handler of errors, produced by other handlers. #[must_use] - pub fn handlers_error_handler(mut self, val: T) -> Self + fn new_tx(&self, h: H) -> Tx where - T: ErrorHandler + 'a, + H: DispatcherHandler + Send + Sync + 'static, + Upd: Send + Sync + 'static, { - self.handlers_error_handler = Box::new(val); + let (tx, rx) = mpsc::unbounded_channel(); + tokio::spawn(async move { + h.handle(rx).await; + }); + Some(Mutex::new(tx)) + } + + #[must_use] + pub fn messages_handler(mut self, h: H) -> Self + where + H: DispatcherHandler + 'static + Send + Sync, + { + self.messages_queue = self.new_tx(h); self } #[must_use] - pub fn update_handler(mut self, h: &'a H) -> Self + pub fn edited_messages_handler(mut self, h: H) -> Self where - H: CtxHandler, I> + 'a, - I: Into> + 'a, + H: DispatcherHandler + 'static + Send + Sync, { - self.update_handlers = register_handler(self.update_handlers, h); + self.edited_messages_queue = self.new_tx(h); self } #[must_use] - pub fn message_handler(mut self, h: &'a H) -> Self + pub fn channel_posts_handler(mut self, h: H) -> Self where - H: CtxHandler, I> + 'a, - I: Into> + 'a, + H: DispatcherHandler + 'static + Send + Sync, { - self.message_handlers = register_handler(self.message_handlers, h); + self.channel_posts_queue = self.new_tx(h); self } #[must_use] - pub fn edited_message_handler(mut self, h: &'a H) -> Self + pub fn edited_channel_posts_handler(mut self, h: H) -> Self where - H: CtxHandler, I> + 'a, - I: Into> + 'a, + H: DispatcherHandler + 'static + Send + Sync, { - self.edited_message_handlers = - register_handler(self.edited_message_handlers, h); + self.edited_channel_posts_queue = self.new_tx(h); self } #[must_use] - pub fn channel_post_handler(mut self, h: &'a H) -> Self + pub fn inline_queries_handler(mut self, h: H) -> Self where - H: CtxHandler, I> + 'a, - I: Into> + 'a, + H: DispatcherHandler + 'static + Send + Sync, { - self.channel_post_handlers = - register_handler(self.channel_post_handlers, h); + self.inline_queries_queue = self.new_tx(h); self } #[must_use] - pub fn edited_channel_post_handler(mut self, h: &'a H) -> Self + pub fn chosen_inline_results_handler(mut self, h: H) -> Self where - H: CtxHandler, I> + 'a, - I: Into> + 'a, + H: DispatcherHandler + 'static + Send + Sync, { - self.edited_channel_post_handlers = - register_handler(self.edited_channel_post_handlers, h); + self.chosen_inline_results_queue = self.new_tx(h); self } #[must_use] - pub fn inline_query_handler(mut self, h: &'a H) -> Self + pub fn callback_queries_handler(mut self, h: H) -> Self where - H: CtxHandler, I> + 'a, - I: Into> + 'a, + H: DispatcherHandler + 'static + Send + Sync, { - self.inline_query_handlers = - register_handler(self.inline_query_handlers, h); + self.callback_queries_queue = self.new_tx(h); self } #[must_use] - pub fn chosen_inline_result_handler(mut self, h: &'a H) -> Self + pub fn shipping_queries_handler(mut self, h: H) -> Self where - H: CtxHandler, I> + 'a, - I: Into> + 'a, + H: DispatcherHandler + 'static + Send + Sync, { - self.chosen_inline_result_handlers = - register_handler(self.chosen_inline_result_handlers, h); + self.shipping_queries_queue = self.new_tx(h); self } #[must_use] - pub fn callback_query_handler(mut self, h: &'a H) -> Self + pub fn pre_checkout_queries_handler(mut self, h: H) -> Self where - H: CtxHandler, I> + 'a, - I: Into> + 'a, + H: DispatcherHandler + 'static + Send + Sync, { - self.callback_query_handlers = - register_handler(self.callback_query_handlers, h); + self.pre_checkout_queries_queue = self.new_tx(h); self } #[must_use] - pub fn shipping_query_handler(mut self, h: &'a H) -> Self + pub fn polls_handler(mut self, h: H) -> Self where - H: CtxHandler, I> + 'a, - I: Into> + 'a, + H: DispatcherHandler + 'static + Send + Sync, { - self.shipping_query_handlers = - register_handler(self.shipping_query_handlers, h); + self.polls_queue = self.new_tx(h); self } #[must_use] - pub fn pre_checkout_query_handler(mut self, h: &'a H) -> Self + pub fn poll_answers_handler(mut self, h: H) -> Self where - H: CtxHandler, I> + 'a, - I: Into> + 'a, + H: DispatcherHandler + 'static + Send + Sync, { - self.pre_checkout_query_handlers = - register_handler(self.pre_checkout_query_handlers, h); - self - } - - #[must_use] - pub fn poll_handler(mut self, h: &'a H) -> Self - where - H: CtxHandler, I> + 'a, - I: Into> + 'a, - { - self.poll_handlers = register_handler(self.poll_handlers, h); - self - } - - #[must_use] - pub fn poll_answer_handler(mut self, h: &'a H) -> Self - where - H: CtxHandler, I> + 'a, - I: Into> + 'a, - { - self.poll_answer_handlers = - register_handler(self.poll_answer_handlers, h); + self.poll_answers_queue = self.new_tx(h); self } @@ -217,7 +207,7 @@ where /// /// The default parameters are a long polling update listener and log all /// errors produced by this listener). - pub async fn dispatch(&'a self) { + pub async fn dispatch(&self) { self.dispatch_with_listener( update_listeners::polling_default(Arc::clone(&self.bot)), &LoggingErrorHandler::new("An error from the update listener"), @@ -227,7 +217,7 @@ where /// Starts your bot with custom `update_listener` and /// `update_listener_error_handler`. - pub async fn dispatch_with_listener( + pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>( &'a self, update_listener: UListener, update_listener_error_handler: &'a Eh, @@ -239,7 +229,7 @@ where let update_listener = Box::pin(update_listener); update_listener - .for_each_concurrent(None, move |update| async move { + .for_each(move |update| async move { log::trace!("Dispatcher received an update: {:?}", update); let update = match update { @@ -250,132 +240,97 @@ where } }; - let update = - match self.handle(&self.update_handlers, update).await { - Some(update) => update, - None => return, - }; - match update.kind { UpdateKind::Message(message) => { - self.handle(&self.message_handlers, message).await; + send!( + &self.bot, + &self.messages_queue, + message, + UpdateKind::Message + ); } UpdateKind::EditedMessage(message) => { - self.handle(&self.edited_message_handlers, message) - .await; + send!( + &self.bot, + &self.edited_messages_queue, + message, + UpdateKind::EditedMessage + ); } UpdateKind::ChannelPost(post) => { - self.handle(&self.channel_post_handlers, post).await; + send!( + &self.bot, + &self.channel_posts_queue, + post, + UpdateKind::ChannelPost + ); } UpdateKind::EditedChannelPost(post) => { - self.handle(&self.edited_channel_post_handlers, post) - .await; + send!( + &self.bot, + &self.edited_channel_posts_queue, + post, + UpdateKind::EditedChannelPost + ); } UpdateKind::InlineQuery(query) => { - self.handle(&self.inline_query_handlers, query).await; + send!( + &self.bot, + &self.inline_queries_queue, + query, + UpdateKind::InlineQuery + ); } UpdateKind::ChosenInlineResult(result) => { - self.handle( - &self.chosen_inline_result_handlers, + send!( + &self.bot, + &self.chosen_inline_results_queue, result, - ) - .await; + UpdateKind::ChosenInlineResult + ); } UpdateKind::CallbackQuery(query) => { - self.handle(&self.callback_query_handlers, query).await; + send!( + &self.bot, + &self.callback_queries_queue, + query, + UpdateKind::CallbackQuer + ); } UpdateKind::ShippingQuery(query) => { - self.handle(&self.shipping_query_handlers, query).await; + send!( + &self.bot, + &self.shipping_queries_queue, + query, + UpdateKind::ShippingQuery + ); } UpdateKind::PreCheckoutQuery(query) => { - self.handle(&self.pre_checkout_query_handlers, query) - .await; + send!( + &self.bot, + &self.pre_checkout_queries_queue, + query, + UpdateKind::PreCheckoutQuery + ); } UpdateKind::Poll(poll) => { - self.handle(&self.poll_handlers, poll).await; + send!( + &self.bot, + &self.polls_queue, + poll, + UpdateKind::Poll + ); } UpdateKind::PollAnswer(answer) => { - self.handle(&self.poll_answer_handlers, answer).await; - } - } - }) - .await - } - - // Handles a single update. - #[allow(clippy::ptr_arg)] - async fn handle( - &self, - handlers: &Handlers<'a, Upd, HandlerE>, - update: Upd, - ) -> Option { - stream::iter(handlers) - .fold(Some(update), |acc, handler| { - async move { - // Option::and_then is not working here, because - // Middleware::handle is asynchronous. - match acc { - Some(update) => { - let DispatcherHandlerResult { next, result } = - handler - .handle_ctx(DispatcherHandlerCtx { - bot: Arc::clone(&self.bot), - update, - }) - .await; - - if let Err(error) = result { - self.handlers_error_handler - .handle_error(error) - .await - } - - next - } - None => None, + send!( + &self.bot, + &self.poll_answers_queue, + answer, + UpdateKind::PollAnswer + ); } } }) .await } } - -/// Transforms Future into Future by applying an Into -/// conversion. -async fn intermediate_fut0(fut: impl Future) -> U -where - T: Into, -{ - fut.await.into() -} - -/// Transforms CtxHandler with Into> as a return -/// value into CtxHandler with DispatcherHandlerResult return value. -fn intermediate_fut1<'a, Upd, HandlerE, H, I>( - h: &'a H, -) -> impl CtxHandler< - DispatcherHandlerCtx, - DispatcherHandlerResult, -> + 'a -where - H: CtxHandler, I> + 'a, - I: Into> + 'a, - Upd: 'a, -{ - move |ctx| intermediate_fut0(h.handle_ctx(ctx)) -} - -/// Registers a single handler. -fn register_handler<'a, Upd, H, I, HandlerE>( - mut handlers: Handlers<'a, Upd, HandlerE>, - h: &'a H, -) -> Handlers<'a, Upd, HandlerE> -where - H: CtxHandler, I> + 'a, - I: Into> + 'a, - HandlerE: 'a, - Upd: 'a, -{ - handlers.push(Box::new(intermediate_fut1(h))); - handlers -} diff --git a/src/dispatching/dispatcher_handler.rs b/src/dispatching/dispatcher_handler.rs new file mode 100644 index 00000000..8152d466 --- /dev/null +++ b/src/dispatching/dispatcher_handler.rs @@ -0,0 +1,35 @@ +use std::{future::Future, pin::Pin}; + +use crate::dispatching::{DispatcherHandlerCtx, DispatcherHandlerRx}; + +/// An asynchronous handler of a stream of updates used in [`Dispatcher`]. +/// +/// See [the module-level documentation for the design +/// overview](crate::dispatching). +/// +/// [`Dispatcher`]: crate::dispatching::Dispatcher +pub trait DispatcherHandler { + #[must_use] + fn handle<'a>( + &'a self, + updates: DispatcherHandlerRx, + ) -> Pin + Send + Sync + 'a>> + where + DispatcherHandlerCtx: Send + Sync + 'a; +} + +impl DispatcherHandler for F +where + F: Fn(DispatcherHandlerRx) -> Fut + Send + Sync + Sync + 'static, + Fut: Future + Send + Sync + 'static, +{ + fn handle<'a>( + &'a self, + updates: DispatcherHandlerRx, + ) -> Pin + Send + Sync + 'a>> + where + DispatcherHandlerCtx: Send + Sync + 'a, + { + Box::pin(async move { self(updates).await }) + } +} diff --git a/src/dispatching/dispatcher_handler_ctx.rs b/src/dispatching/dispatcher_handler_ctx.rs index ccda15ff..94cf1c95 100644 --- a/src/dispatching/dispatcher_handler_ctx.rs +++ b/src/dispatching/dispatcher_handler_ctx.rs @@ -17,6 +17,7 @@ use std::sync::Arc; /// overview](crate::dispatching). /// /// [`Dispatcher`]: crate::dispatching::Dispatcher +#[derive(Debug)] pub struct DispatcherHandlerCtx { pub bot: Arc, pub update: Upd, diff --git a/src/dispatching/dispatcher_handler_result.rs b/src/dispatching/dispatcher_handler_result.rs deleted file mode 100644 index d7cacf22..00000000 --- a/src/dispatching/dispatcher_handler_result.rs +++ /dev/null @@ -1,31 +0,0 @@ -/// A result of a handler in [`Dispatcher`]. -/// -/// See [the module-level documentation for the design -/// overview](crate::dispatching). -/// -/// [`Dispatcher`]: crate::dispatching::Dispatcher -pub struct DispatcherHandlerResult { - pub next: Option, - pub result: Result<(), E>, -} - -impl DispatcherHandlerResult { - /// Creates new `DispatcherHandlerResult` that continues the pipeline. - pub fn next(update: Upd, result: Result<(), E>) -> Self { - Self { - next: Some(update), - result, - } - } - - /// Creates new `DispatcherHandlerResult` that terminates the pipeline. - pub fn exit(result: Result<(), E>) -> Self { - Self { next: None, result } - } -} - -impl From> for DispatcherHandlerResult { - fn from(result: Result<(), E>) -> Self { - Self::exit(result) - } -} diff --git a/src/dispatching/mod.rs b/src/dispatching/mod.rs index d60fd5e0..dd6ce6a0 100644 --- a/src/dispatching/mod.rs +++ b/src/dispatching/mod.rs @@ -1,120 +1,72 @@ //! Updates dispatching. //! -//! The key type here is [`Dispatcher`]. It encapsulates [`Bot`], handlers for -//! [11 update kinds] (+ for [`Update`]) and [`ErrorHandler`] for them. When -//! [`Update`] is received from Telegram, the following steps are executed: +//! The key type here is [`Dispatcher`]. It encapsulates [`Bot`] and handlers +//! for [the 11 update kinds]. //! -//! 1. It is supplied into an appropriate handler (the first ones is those who -//! accept [`Update`]). -//! 2. If a handler failed, invoke [`ErrorHandler`] with the corresponding -//! error. -//! 3. If a handler has returned [`DispatcherHandlerResult`] with `None`, -//! terminate the pipeline, otherwise supply an update into the next handler -//! (back to step 1). +//! You can register a maximum of 11 handlers for [the 11 update kinds]. Every +//! handler accept [`tokio::sync::mpsc::UnboundedReceiver`] (the RX halve of an +//! asynchronous unbounded MPSC channel). Inside a body of your handler, you +//! typically asynchronously concurrently iterate through updates like this: //! -//! The pipeline is executed until either all the registered handlers were -//! executed, or one of handlers has terminated the pipeline. That's simple! +//! ``` +//! use teloxide::prelude::*; //! -//! 1. Note that handlers implement [`CtxHandler`], which means that you are -//! able to supply [`DialogueDispatcher`] as a handler, since it implements -//! [`CtxHandler`] too! -//! 2. Note that you don't always need to return [`DispatcherHandlerResult`] -//! explicitly, because of automatic conversions. Just return `Result<(), E>` if -//! you want to terminate the pipeline (see the example below). +//! async fn handle_messages(rx: DispatcherHandlerRx) { +//! rx.for_each_concurrent(None, |message| async move { +//! dbg!(message); +//! }) +//! .await; +//! } +//! ``` +//! +//! When [`Update`] is received from Telegram, [`Dispatcher`] pushes it into an +//! appropriate handler. That's simple! +//! +//! **Note** that handlers must implement [`DispatcherHandler`], which means +//! that: +//! - You are able to supply [`DialogueDispatcher`] as a handler. +//! - You are able to supply functions that accept +//! [`tokio::sync::mpsc::UnboundedReceiver`] and return `Future::new(Bot::from_env()) -//! .message_handler(&|ctx: DispatcherHandlerCtx| async move { -//! ctx.answer("pong").send().await?; -//! Ok(()) -//! }) -//! .dispatch() -//! .await; -//! # } -//! ``` -//! //! [Full](https://github.com/teloxide/teloxide/blob/master/examples/ping_pong_bot/) //! -//! ### Multiple handlers -//! -//! ```no_run -//! # #[tokio::main] -//! # async fn main_() { -//! use teloxide::prelude::*; -//! -//! // Create a dispatcher with multiple handlers of different types. This will -//! // print One! and Two! on every incoming UpdateKind::Message. -//! Dispatcher::::new(Bot::from_env()) -//! // This is the first UpdateKind::Message handler, which will be called -//! // after the Update handler below. -//! .message_handler(&|ctx: DispatcherHandlerCtx| async move { -//! log::info!("Two!"); -//! DispatcherHandlerResult::next(ctx.update, Ok(())) -//! }) -//! // Remember: handler of Update are called first. -//! .update_handler(&|ctx: DispatcherHandlerCtx| async move { -//! log::info!("One!"); -//! DispatcherHandlerResult::next(ctx.update, Ok(())) -//! }) -//! // This handler will be called right after the first UpdateKind::Message -//! // handler, because it is registered after. -//! .message_handler(&|_ctx: DispatcherHandlerCtx| async move { -//! // The same as DispatcherHandlerResult::exit(Ok(())) -//! Ok(()) -//! }) -//! // This handler will never be called, because the UpdateKind::Message -//! // handler above terminates the pipeline. -//! .message_handler(&|ctx: DispatcherHandlerCtx| async move { -//! log::info!("This will never be printed!"); -//! DispatcherHandlerResult::next(ctx.update, Ok(())) -//! }) -//! .dispatch() -//! .await; -//! -//! // Note: if this bot receive, for example, UpdateKind::ChannelPost, it will -//! // only print "One!", because the UpdateKind::Message handlers will not be -//! // called. -//! # } -//! ``` -//! -//! [Full](https://github.com/teloxide/teloxide/blob/master/examples/miltiple_handlers_bot/) -//! //! For a bit more complicated example, please see [examples/dialogue_bot]. //! //! [`Dispatcher`]: crate::dispatching::Dispatcher -//! [11 update kinds]: crate::types::UpdateKind +//! [the 11 update kinds]: crate::types::UpdateKind //! [`Update`]: crate::types::Update //! [`ErrorHandler`]: crate::dispatching::ErrorHandler -//! [`CtxHandler`]: crate::dispatching::CtxHandler +//! [`DispatcherHandler`]: crate::dispatching::DispatcherHandler //! [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher //! [`DispatcherHandlerResult`]: crate::dispatching::DispatcherHandlerResult //! [`Bot`]: crate::Bot +//! [`tokio::sync::mpsc::UnboundedReceiver`]: https://docs.rs/tokio/0.2.11/tokio/sync/mpsc/struct.UnboundedReceiver.html //! [examples/dialogue_bot]: https://github.com/teloxide/teloxide/tree/master/examples/dialogue_bot -mod ctx_handlers; pub mod dialogue; mod dispatcher; +mod dispatcher_handler; mod dispatcher_handler_ctx; -mod dispatcher_handler_result; mod error_handlers; pub mod update_listeners; -pub use ctx_handlers::CtxHandler; pub use dispatcher::Dispatcher; +pub use dispatcher_handler::DispatcherHandler; pub use dispatcher_handler_ctx::DispatcherHandlerCtx; -pub use dispatcher_handler_result::DispatcherHandlerResult; pub use error_handlers::{ ErrorHandler, IgnoringErrorHandler, IgnoringErrorHandlerSafe, LoggingErrorHandler, }; +use tokio::sync::mpsc::UnboundedReceiver; + +/// A type of a stream, consumed by [`Dispatcher`]'s handlers. +/// +/// [`Dispatcher`]: crate::dispatching::Dispatcher +pub type DispatcherHandlerRx = + UnboundedReceiver>; diff --git a/src/lib.rs b/src/lib.rs index 1cee23c2..65ee15ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,203 +45,6 @@ //! pretty_env_logger = "0.4.0" //! ``` //! -//! ## The ping-pong bot -//! This bot has a single message handler, which answers "pong" to each incoming -//! message: -//! -//! ([Full](https://github.com/teloxide/teloxide/blob/master/examples/ping_pong_bot/src/main.rs)) -//! ```rust,no_run -//! use teloxide::prelude::*; -//! -//! # #[tokio::main] -//! # async fn main() { -//! teloxide::enable_logging!(); -//! log::info!("Starting the ping-pong bot!"); -//! -//! let bot = Bot::from_env(); -//! -//! Dispatcher::::new(bot) -//! .message_handler(&|ctx: DispatcherHandlerCtx| async move { -//! ctx.answer("pong").send().await?; -//! Ok(()) -//! }) -//! .dispatch() -//! .await; -//! # } -//! ``` -//! -//!
-//! -//!
-//! -//! ## Commands -//! Commands are defined similar to how we define CLI using [structopt]. This -//! bot says "I am a cat! Meow!" on `/meow`, generates a random number within -//! [0; 1) on `/generate`, and shows the usage guide on `/help`: -//! -//! ([Full](https://github.com/teloxide/teloxide/blob/master/examples/simple_commands_bot/src/main.rs)) -//! ```rust,no_run -//! # use teloxide::{prelude::*, utils::command::BotCommand}; -//! # use rand::{thread_rng, Rng}; -//! // Imports are omitted... -//! -//! #[derive(BotCommand)] -//! #[command( -//! rename = "lowercase", -//! description = "These commands are supported:" -//! )] -//! enum Command { -//! #[command(description = "display this text.")] -//! Help, -//! #[command(description = "be a cat.")] -//! Meow, -//! #[command(description = "generate a random number within [0; 1).")] -//! Generate, -//! } -//! -//! async fn handle_command( -//! ctx: DispatcherHandlerCtx, -//! ) -> Result<(), RequestError> { -//! let text = match ctx.update.text() { -//! Some(text) => text, -//! None => { -//! log::info!("Received a message, but not text."); -//! return Ok(()); -//! } -//! }; -//! -//! let command = match Command::parse(text) { -//! Some((command, _)) => command, -//! None => { -//! log::info!("Received a text message, but not a command."); -//! return Ok(()); -//! } -//! }; -//! -//! match command { -//! Command::Help => ctx.answer(Command::descriptions()).send().await?, -//! Command::Generate => { -//! ctx.answer(thread_rng().gen_range(0.0, 1.0).to_string()) -//! .send() -//! .await? -//! } -//! Command::Meow => ctx.answer("I am a cat! Meow!").send().await?, -//! }; -//! -//! Ok(()) -//! } -//! -//! #[tokio::main] -//! async fn main() { -//! // Setup is omitted... -//! # teloxide::enable_logging!(); -//! # log::info!("Starting simple_commands_bot!"); -//! # -//! # let bot = Bot::from_env(); -//! # -//! # Dispatcher::::new(bot) -//! # .message_handler(&handle_command) -//! # .dispatch() -//! # .await; -//! } -//! ``` -//! -//!
-//! -//!
-//! -//! ## Guess a number -//! Wanna see more? This is a bot, which starts a game on each incoming message. -//! You must guess a number from 1 to 10 (inclusively): -//! -//! ([Full](https://github.com/teloxide/teloxide/blob/master/examples/guess_a_number_bot/src/main.rs)) -//! ```rust,no_run -//! # #[macro_use] -//! # extern crate smart_default; -//! # use teloxide::prelude::*; -//! # use rand::{thread_rng, Rng}; -//! // Imports are omitted... -//! -//! #[derive(SmartDefault)] -//! enum Dialogue { -//! #[default] -//! Start, -//! ReceiveAttempt(u8), -//! } -//! async fn handle_message( -//! ctx: DialogueHandlerCtx, -//! ) -> Result, RequestError> { -//! match ctx.dialogue { -//! Dialogue::Start => { -//! ctx.answer( -//! "Let's play a game! Guess a number from 1 to 10 -//! (inclusively).", -//! ) -//! .send() -//! .await?; -//! next(Dialogue::ReceiveAttempt(thread_rng().gen_range(1, 11))) -//! } -//! Dialogue::ReceiveAttempt(secret) => match ctx.update.text() { -//! None => { -//! ctx.answer("Oh, please, send me a text message!") -//! .send() -//! .await?; -//! next(ctx.dialogue) -//! } -//! Some(text) => match text.parse::() { -//! Ok(attempt) => match attempt { -//! x if !(1..=10).contains(&x) => { -//! ctx.answer( -//! "Oh, please, send me a number in the range \ -//! [1; 10]!", -//! ) -//! .send() -//! .await?; -//! next(ctx.dialogue) -//! } -//! x if x == secret => { -//! ctx.answer("Congratulations! You won!") -//! .send() -//! .await?; -//! exit() -//! } -//! _ => { -//! ctx.answer("No.").send().await?; -//! next(ctx.dialogue) -//! } -//! }, -//! Err(_) => { -//! ctx.answer( -//! "Oh, please, send me a number in the range [1; \ -//! 10]!", -//! ) -//! .send() -//! .await?; -//! next(ctx.dialogue) -//! } -//! }, -//! }, -//! } -//! } -//! -//! #[tokio::main] -//! async fn main() { -//! # teloxide::enable_logging!(); -//! # log::info!("Starting guess_a_number_bot!"); -//! # let bot = Bot::from_env(); -//! // Setup is omitted... -//! -//! Dispatcher::new(bot) -//! .message_handler(&DialogueDispatcher::new(|ctx| async move { -//! handle_message(ctx) -//! .await -//! .expect("Something wrong with the bot!") -//! })) -//! .dispatch() -//! .await; -//! } -//! ``` -//! //!
//! //!
diff --git a/src/prelude.rs b/src/prelude.rs index 97b01f16..37d4e99e 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -3,12 +3,16 @@ pub use crate::{ dispatching::{ dialogue::{ - exit, next, DialogueDispatcher, DialogueHandlerCtx, DialogueStage, - GetChatId, + exit, next, DialogueDispatcher, DialogueDispatcherHandlerCtx, + DialogueStage, GetChatId, }, - Dispatcher, DispatcherHandlerCtx, DispatcherHandlerResult, + Dispatcher, DispatcherHandlerCtx, DispatcherHandlerRx, }, requests::{Request, ResponseResult}, types::{Message, Update}, Bot, RequestError, }; + +pub use tokio::sync::mpsc::UnboundedReceiver; + +pub use futures::StreamExt;