diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b0dd6fc..303f88b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## unreleased +### Removed + + - The old dispatching system and related stuff: `dispatching`, `utils::UpState`, `prelude`, `repls2`, `crate::{dialogues_repl, dialogues_repl_with_listener}`, and `#[teloxide(...)]`. + +### Changed + + - Rename `dispatching2` => `dispatching`. + - Rename `prelude2` => `prelude`. + - Move `update_listeners`, `stop_token`, `IdleShutdownError`, and `ShutdownToken` from the old `dispatching` to the new `dispatching`. + - Replace `crate::{commands_repl, commands_repl_with_listener, repl, repl_with_listener}` with those of the new `dispatching`. + ## 0.7.2 - 2022-03-23 ### Added diff --git a/Cargo.toml b/Cargo.toml index 8928132d..bc0a3138 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,16 +13,13 @@ exclude = ["media"] [features] # FIXME: remove "cache-me" that was added by mistake here -default = ["native-tls", "ctrlc_handler", "teloxide-core/default", "auto-send", "dispatching2", "cache-me"] - -dispatching2 = ["dptree", "cache-me"] +default = ["native-tls", "ctrlc_handler", "teloxide-core/default", "auto-send", "cache-me"] sqlite-storage = ["sqlx"] redis-storage = ["redis"] cbor-serializer = ["serde_cbor"] bincode-serializer = ["bincode"] -frunk- = ["frunk"] macros = ["teloxide-macros"] ctrlc_handler = ["tokio/signal"] @@ -44,7 +41,6 @@ full = [ "redis-storage", "cbor-serializer", "bincode-serializer", - "frunk", "macros", "ctrlc_handler", "teloxide-core/full", @@ -59,28 +55,26 @@ full = [ [dependencies] teloxide-core = { version = "0.4", default-features = false } -teloxide-macros = { version = "0.5.1", optional = true } +teloxide-macros = { git = "https://github.com/teloxide/teloxide-macros.git", rev = "144eb73aaf39145bf8f6b57eec5c76730961c2f1", optional = true } serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } -dptree = { version = "0.1.0", optional = true } +dptree = { version = "0.1.0" } tokio = { version = "1.8", features = ["fs"] } tokio-util = "0.6" -tokio-stream = "0.1" -flurry = "0.3" log = "0.4" bytes = "1.0" mime = "0.3" derive_more = "0.99" thiserror = "1.0" -async-trait = "0.1" futures = "0.3.15" pin-project = "1.0" serde_with_macros = "1.4" +aquamarine = "0.1.11" sqlx = { version = "0.5", optional = true, default-features = false, features = [ "runtime-tokio-native-tls", @@ -90,15 +84,11 @@ sqlx = { version = "0.5", optional = true, default-features = false, features = redis = { version = "0.20", features = ["tokio-comp"], optional = true } serde_cbor = { version = "0.11", optional = true } bincode = { version = "1.3", optional = true } -frunk = { version = "0.4", optional = true } -aquamarine = "0.1.11" [dev-dependencies] -smart-default = "0.6.0" rand = "0.8.3" pretty_env_logger = "0.4.0" once_cell = "1.9.0" -lazy_static = "1.4.0" anyhow = "1.0.52" serde = "1" serde_json = "1" @@ -106,6 +96,7 @@ tokio = { version = "1.8", features = ["fs", "rt-multi-thread", "macros"] } warp = "0.3.0" reqwest = "0.10.4" chrono = "0.4" +tokio-stream = "0.1" [package.metadata.docs.rs] all-features = true @@ -149,5 +140,5 @@ name = "admin" required-features = ["macros"] [[example]] -name = "dispatching2_features" +name = "dispatching_features" required-features = ["macros"] diff --git a/README.md b/README.md index 7acff473..f041ac05 100644 --- a/README.md +++ b/README.md @@ -88,7 +88,7 @@ This bot replies with a dice throw to each received message: ([Full](examples/dices.rs)) ```rust,no_run -use teloxide::prelude2::*; +use teloxide::prelude::*; #[tokio::main] async fn main() { @@ -97,7 +97,7 @@ async fn main() { let bot = Bot::from_env().auto_send(); - teloxide::repls2::repl(bot, |message: Message, bot: AutoSend| async move { + teloxide::repl(bot, |message: Message, bot: AutoSend| async move { bot.send_dice(message.chat.id).await?; respond(()) }) @@ -125,10 +125,20 @@ Commands are strongly typed and defined declaratively, similar to how we define ([Full](examples/simple_commands.rs)) ```rust,no_run -use teloxide::{prelude2::*, utils::command::BotCommand}; +use teloxide::{prelude::*, utils::command::BotCommand}; use std::error::Error; +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + log::info!("Starting simple_commands_bot..."); + + let bot = Bot::from_env().auto_send(); + + teloxide::commands_repl(bot, answer, Command::ty()).await; +} + #[derive(BotCommand, Clone)] #[command(rename = "lowercase", description = "These commands are supported:")] enum Command { @@ -161,16 +171,6 @@ async fn answer( Ok(()) } - -#[tokio::main] -async fn main() { - pretty_env_logger::init(); - log::info!("Starting simple_commands_bot..."); - - let bot = Bot::from_env().auto_send(); - - teloxide::repls2::commands_repl(bot, answer, Command::ty()).await; -} ```
@@ -190,7 +190,7 @@ Below is a bot that asks you three questions and then sends the answers back to ([Full](examples/dialogue.rs)) ```rust,ignore -use teloxide::{dispatching2::dialogue::InMemStorage, macros::DialogueState, prelude2::*}; +use teloxide::{dispatching::dialogue::InMemStorage, macros::DialogueState, prelude::*}; type MyDialogue = Dialogue>; diff --git a/examples/admin.rs b/examples/admin.rs index bf257aa9..194b8b54 100644 --- a/examples/admin.rs +++ b/examples/admin.rs @@ -1,7 +1,7 @@ use std::{error::Error, str::FromStr}; use chrono::Duration; -use teloxide::{prelude2::*, types::ChatPermissions, utils::command::BotCommand}; +use teloxide::{prelude::*, types::ChatPermissions, utils::command::BotCommand}; // Derive BotCommand to parse text with a command into this enumeration. // @@ -149,5 +149,5 @@ async fn main() { let bot = teloxide::Bot::from_env().auto_send(); - teloxide::repls2::commands_repl(bot, action, Command::ty()).await; + teloxide::commands_repl(bot, action, Command::ty()).await; } diff --git a/examples/buttons.rs b/examples/buttons.rs index cb98eeec..fa49e1de 100644 --- a/examples/buttons.rs +++ b/examples/buttons.rs @@ -1,7 +1,7 @@ use std::error::Error; use teloxide::{ payloads::SendMessageSetters, - prelude2::*, + prelude::*, types::{ InlineKeyboardButton, InlineKeyboardMarkup, InlineQueryResultArticle, InputMessageContent, InputMessageContentText, diff --git a/examples/db_remember.rs b/examples/db_remember.rs index 8a52318e..589e0578 100644 --- a/examples/db_remember.rs +++ b/examples/db_remember.rs @@ -2,12 +2,12 @@ // Otherwise, the default is Sqlite. use teloxide::{ - dispatching2::dialogue::{ + dispatching::dialogue::{ serializer::{Bincode, Json}, ErasedStorage, RedisStorage, SqliteStorage, Storage, }, macros::DialogueState, - prelude2::*, + prelude::*, types::Me, utils::command::BotCommand, }; diff --git a/examples/dialogue.rs b/examples/dialogue.rs index 449036bc..5aa3b142 100644 --- a/examples/dialogue.rs +++ b/examples/dialogue.rs @@ -13,7 +13,7 @@ // Age: 223 // Location: Middle-earth // ``` -use teloxide::{dispatching2::dialogue::InMemStorage, macros::DialogueState, prelude2::*}; +use teloxide::{dispatching::dialogue::InMemStorage, macros::DialogueState, prelude::*}; type MyDialogue = Dialogue>; diff --git a/examples/dices.rs b/examples/dices.rs index ea9031c4..a91a7ea4 100644 --- a/examples/dices.rs +++ b/examples/dices.rs @@ -1,6 +1,6 @@ // This bot throws a dice on each incoming message. -use teloxide::prelude2::*; +use teloxide::prelude::*; #[tokio::main] async fn main() { @@ -9,7 +9,7 @@ async fn main() { let bot = Bot::from_env().auto_send(); - teloxide::repls2::repl(bot, |message: Message, bot: AutoSend| async move { + teloxide::repl(bot, |message: Message, bot: AutoSend| async move { bot.send_dice(message.chat.id).await?; respond(()) }) diff --git a/examples/dispatching2_features.rs b/examples/dispatching_features.rs similarity index 95% rename from examples/dispatching2_features.rs rename to examples/dispatching_features.rs index 6f5d1e07..ce56e36c 100644 --- a/examples/dispatching2_features.rs +++ b/examples/dispatching_features.rs @@ -1,12 +1,10 @@ // This example provide a quick overview of the new features in the -// `dispatching2` module. +// `dispatching` module. use rand::Rng; -// You need to import `prelude2` because `prelude` contains items from the old -// dispatching system, which will be deprecated in the future. use teloxide::{ - prelude2::*, + prelude::*, types::{Dice, Update}, utils::command::BotCommand, }; @@ -14,7 +12,7 @@ use teloxide::{ #[tokio::main] async fn main() { pretty_env_logger::init(); - log::info!("Starting dispatching2_features_bot..."); + log::info!("Starting dispatching_features_bot..."); let bot = Bot::from_env().auto_send(); diff --git a/examples/heroku_ping_pong.rs b/examples/heroku_ping_pong.rs index 3deb1ad0..02db7f1a 100644 --- a/examples/heroku_ping_pong.rs +++ b/examples/heroku_ping_pong.rs @@ -23,7 +23,7 @@ use teloxide::{ stop_token::AsyncStopToken, update_listeners::{self, StatefulListener}, }, - prelude2::*, + prelude::*, types::Update, }; @@ -41,7 +41,7 @@ async fn main() { let bot = Bot::from_env().auto_send(); - teloxide::repls2::repl_with_listener( + teloxide::repl_with_listener( bot.clone(), |msg: Message, bot: AutoSend| async move { bot.send_message(msg.chat.id, "pong").await?; diff --git a/examples/inline.rs b/examples/inline.rs index bbc57be4..be8871c5 100644 --- a/examples/inline.rs +++ b/examples/inline.rs @@ -1,5 +1,5 @@ use teloxide::{ - prelude2::*, + prelude::*, types::{ InlineQueryResult, InlineQueryResultArticle, InputMessageContent, InputMessageContentText, }, diff --git a/examples/ngrok_ping_pong.rs b/examples/ngrok_ping_pong.rs index 8b866625..17929a6d 100644 --- a/examples/ngrok_ping_pong.rs +++ b/examples/ngrok_ping_pong.rs @@ -6,7 +6,7 @@ use teloxide::{ stop_token::AsyncStopToken, update_listeners::{self, StatefulListener}, }, - prelude2::*, + prelude::*, types::Update, }; @@ -24,7 +24,7 @@ async fn main() { let bot = Bot::from_env().auto_send(); - teloxide::repls2::repl_with_listener( + teloxide::repl_with_listener( bot.clone(), |msg: Message, bot: AutoSend| async move { bot.send_message(msg.chat.id, "pong").await?; diff --git a/examples/shared_state.rs b/examples/shared_state.rs index a5e762c4..49b6b5f9 100644 --- a/examples/shared_state.rs +++ b/examples/shared_state.rs @@ -3,7 +3,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use once_cell::sync::Lazy; -use teloxide::prelude2::*; +use teloxide::prelude::*; static MESSAGES_TOTAL: Lazy = Lazy::new(AtomicU64::default); diff --git a/examples/simple_commands.rs b/examples/simple_commands.rs index 47b4337a..ba2203b1 100644 --- a/examples/simple_commands.rs +++ b/examples/simple_commands.rs @@ -1,7 +1,17 @@ -use teloxide::{prelude2::*, utils::command::BotCommand}; +use teloxide::{prelude::*, utils::command::BotCommand}; use std::error::Error; +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + log::info!("Starting simple_commands_bot..."); + + let bot = Bot::from_env().auto_send(); + + teloxide::commands_repl(bot, answer, Command::ty()).await; +} + #[derive(BotCommand, Clone)] #[command(rename = "lowercase", description = "These commands are supported:")] enum Command { @@ -34,13 +44,3 @@ async fn answer( Ok(()) } - -#[tokio::main] -async fn main() { - pretty_env_logger::init(); - log::info!("Starting simple_commands_bot..."); - - let bot = Bot::from_env().auto_send(); - - teloxide::repls2::commands_repl(bot, answer, Command::ty()).await; -} diff --git a/src/dispatching/dialogue/dialogue_dispatcher.rs b/src/dispatching/dialogue/dialogue_dispatcher.rs deleted file mode 100644 index 6c9703e9..00000000 --- a/src/dispatching/dialogue/dialogue_dispatcher.rs +++ /dev/null @@ -1,291 +0,0 @@ -use crate::dispatching::{ - dialogue::{ - DialogueDispatcherHandler, DialogueStage, DialogueWithCx, GetChatId, InMemStorage, Storage, - }, - DispatcherHandler, UpdateWithCx, -}; -use std::{fmt::Debug, marker::PhantomData}; - -use futures::{future::BoxFuture, FutureExt, StreamExt}; -use tokio::sync::mpsc; - -use crate::dispatching::dialogue::InMemStorageError; -use flurry::HashMap; -use std::sync::{Arc, Mutex}; -use teloxide_core::requests::Requester; -use tokio_stream::wrappers::UnboundedReceiverStream; - -/// A dispatcher of dialogues. -/// -/// Note that it implements [`DispatcherHandler`], so you can just put an -/// instance of this dispatcher into the [`Dispatcher`]'s methods. -/// -/// Note that when the storage methods [`Storage::remove_dialogue`] and -/// [`Storage::update_dialogue`] are failed, the errors are logged, but a result -/// from [`Storage::get_dialogue`] is provided to a user handler as-is so you -/// can respond to a concrete user with an error description. -/// -/// See the [module-level documentation](crate::dispatching::dialogue) for the -/// design overview. -/// -/// [`Dispatcher`]: crate::dispatching::Dispatcher -/// [`DispatcherHandler`]: crate::dispatching::DispatcherHandler -#[deprecated(note = "Use dispatching2 instead")] -pub struct DialogueDispatcher { - storage: Arc, - handler: Arc, - _phantom: PhantomData>, - - /// A lock-free map to handle updates from the same chat sequentially, but - /// concurrently from different chats. - /// - /// A value is the TX part of an unbounded asynchronous MPSC channel. A - /// handler that executes updates from the same chat ID sequentially - /// handles the RX part. - senders: Arc>>>, -} - -impl DialogueDispatcher, H, Upd> -where - H: DialogueDispatcherHandler + Send + Sync + 'static, - Upd: GetChatId + Send + 'static, - D: Default + Send + 'static, -{ - /// Creates a dispatcher with the specified `handler` and [`InMemStorage`] - /// (a default storage). - /// - /// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage - #[must_use] - pub fn new(handler: H) -> Self { - Self { - storage: InMemStorage::new(), - handler: Arc::new(handler), - senders: Arc::new(HashMap::new()), - _phantom: PhantomData, - } - } -} - -impl DialogueDispatcher -where - H: DialogueDispatcherHandler + Send + Sync + 'static, - Upd: GetChatId + Send + 'static, - D: Default + Send + 'static, - S: Storage + Send + Sync + 'static, - S::Error: Debug + Send + 'static, -{ - /// Creates a dispatcher with the specified `handler` and `storage`. - #[must_use] - pub fn with_storage(handler: H, storage: Arc) -> Self { - Self { - storage, - handler: Arc::new(handler), - senders: Arc::new(HashMap::new()), - _phantom: PhantomData, - } - } - - #[must_use] - fn new_tx(&self) -> mpsc::UnboundedSender> - where - R: Requester + Send + 'static, - { - let (tx, rx) = mpsc::unbounded_channel(); - - let storage = Arc::clone(&self.storage); - let handler = Arc::clone(&self.handler); - let senders = Arc::clone(&self.senders); - - tokio::spawn(UnboundedReceiverStream::new(rx).for_each(move |cx: UpdateWithCx| { - let storage = Arc::clone(&storage); - let handler = Arc::clone(&handler); - let senders = Arc::clone(&senders); - - async move { - let chat_id = cx.update.chat_id(); - - let dialogue = - Arc::clone(&storage).get_dialogue(chat_id).await.map(Option::unwrap_or_default); - - match handler.handle(DialogueWithCx { cx, dialogue }).await { - DialogueStage::Next(new_dialogue) => { - if let Err(e) = storage.update_dialogue(chat_id, new_dialogue).await { - log::error!("Storage::update_dialogue failed: {:?}", e); - } - } - DialogueStage::Exit => { - // On the next .poll() call, the spawned future will - // return Poll::Ready, because we are dropping the - // sender right here: - senders.pin().remove(&chat_id); - - if let Err(e) = storage.remove_dialogue(chat_id).await { - log::error!("Storage::remove_dialogue failed: {:?}", e); - } - } - } - } - })); - - tx - } -} - -impl DispatcherHandler for DialogueDispatcher -where - H: DialogueDispatcherHandler + Send + Sync + 'static, - Upd: GetChatId + Send + 'static, - D: Default + Send + 'static, - S: Storage + Send + Sync + 'static, - S::Error: Debug + Send + 'static, - R: Requester + Send, -{ - fn handle( - self, - updates: mpsc::UnboundedReceiver>, - ) -> BoxFuture<'static, ()> - where - UpdateWithCx: 'static, - { - let this = Arc::new(self); - - UnboundedReceiverStream::new(updates) - .for_each(move |cx| { - let this = Arc::clone(&this); - let chat_id = cx.update.chat_id(); - - match this.senders.pin().get(&chat_id) { - // An old dialogue - Some(tx) => { - assert!( - tx.send(cx).is_ok(), - "We are not dropping a receiver or call .close() on it" - ); - } - None => { - let tx = this.new_tx(); - assert!( - tx.send(cx).is_ok(), - "We are not dropping a receiver or call .close() on it" - ); - this.senders.pin().insert(chat_id, tx); - } - } - - async {} - }) - .boxed() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - use futures::{stream, StreamExt}; - use lazy_static::lazy_static; - use teloxide_core::Bot; - use tokio::{ - sync::{mpsc, Mutex}, - time::Duration, - }; - - #[tokio::test] - #[allow(deprecated)] - async fn updates_from_same_chat_executed_sequentially() { - #[derive(Debug)] - struct MyUpdate { - chat_id: i64, - unique_number: u32, - } - - impl MyUpdate { - fn new(chat_id: i64, unique_number: u32) -> Self { - Self { chat_id, unique_number } - } - } - - impl GetChatId for MyUpdate { - fn chat_id(&self) -> i64 { - self.chat_id - } - } - - lazy_static! { - static ref SEQ1: Mutex> = Mutex::new(Vec::new()); - static ref SEQ2: Mutex> = Mutex::new(Vec::new()); - static ref SEQ3: Mutex> = Mutex::new(Vec::new()); - } - - let dispatcher = DialogueDispatcher::new( - |cx: DialogueWithCx| async move { - tokio::time::sleep(Duration::from_millis(300)).await; - - match cx.cx.update { - MyUpdate { chat_id: 1, unique_number } => { - SEQ1.lock().await.push(unique_number); - } - MyUpdate { chat_id: 2, unique_number } => { - SEQ2.lock().await.push(unique_number); - } - MyUpdate { chat_id: 3, unique_number } => { - SEQ3.lock().await.push(unique_number); - } - _ => unreachable!(), - } - - DialogueStage::Next(()) - }, - ); - - let updates = stream::iter( - vec![ - MyUpdate::new(1, 174), - MyUpdate::new(1, 125), - MyUpdate::new(2, 411), - MyUpdate::new(1, 2), - MyUpdate::new(2, 515), - MyUpdate::new(2, 623), - MyUpdate::new(1, 193), - MyUpdate::new(1, 104), - MyUpdate::new(2, 2222), - MyUpdate::new(2, 737), - MyUpdate::new(3, 72782), - MyUpdate::new(3, 2737), - MyUpdate::new(1, 7), - MyUpdate::new(1, 7778), - MyUpdate::new(3, 5475), - MyUpdate::new(3, 1096), - MyUpdate::new(3, 872), - MyUpdate::new(2, 10), - MyUpdate::new(2, 55456), - MyUpdate::new(3, 5665), - MyUpdate::new(3, 1611), - ] - .into_iter() - .map(|update| UpdateWithCx { update, requester: Bot::new("Doesn't matter here") }) - .collect::>>(), - ); - - let (tx, rx) = mpsc::unbounded_channel(); - - updates - .for_each(move |update| { - let tx = tx.clone(); - - async move { - assert!(tx.send(update).is_ok(), "tx.send(update) failed"); - } - }) - .await; - - dispatcher.handle(rx).await; - - // Wait until our futures to be finished. - tokio::time::sleep(Duration::from_millis(3000)).await; - - assert_eq!(*SEQ1.lock().await, vec![174, 125, 2, 193, 104, 7, 7778]); - assert_eq!(*SEQ2.lock().await, vec![411, 515, 623, 2222, 737, 10, 55456]); - assert_eq!(*SEQ3.lock().await, vec![72782, 2737, 5475, 1096, 872, 5665, 1611]); - } -} diff --git a/src/dispatching/dialogue/dialogue_dispatcher_handler.rs b/src/dispatching/dialogue/dialogue_dispatcher_handler.rs deleted file mode 100644 index d3bd92c9..00000000 --- a/src/dispatching/dialogue/dialogue_dispatcher_handler.rs +++ /dev/null @@ -1,41 +0,0 @@ -use crate::dispatching::dialogue::{DialogueStage, DialogueWithCx}; -use futures::future::BoxFuture; -use std::{future::Future, sync::Arc}; - -/// An asynchronous handler of an update used in [`DialogueDispatcher`]. -/// -/// See [the module-level documentation for the design -/// overview](crate::dispatching::dialogue). -/// -/// [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher -#[deprecated(note = "Use dispatching2 instead")] -pub trait DialogueDispatcherHandler { - #[must_use] - fn handle( - self: Arc, - cx: DialogueWithCx, - ) -> BoxFuture<'static, DialogueStage> - where - DialogueWithCx: Send + 'static, - R: Send, - Upd: Send, - D: Send, - E: Send; -} - -impl DialogueDispatcherHandler for F -where - F: Fn(DialogueWithCx) -> Fut + Send + Sync + 'static, - Fut: Future> + Send + 'static, -{ - fn handle(self: Arc, cx: DialogueWithCx) -> BoxFuture<'static, Fut::Output> - where - DialogueWithCx: Send + 'static, - R: Send, - Upd: Send, - D: Send, - E: Send, - { - Box::pin(async move { self(cx).await }) - } -} diff --git a/src/dispatching/dialogue/dialogue_stage.rs b/src/dispatching/dialogue/dialogue_stage.rs deleted file mode 100644 index c4a29744..00000000 --- a/src/dispatching/dialogue/dialogue_stage.rs +++ /dev/null @@ -1,40 +0,0 @@ -use crate::dispatching::dialogue::TransitionOut; - -/// Continue or terminate a dialogue. -/// -/// See [the module-level documentation for the design -/// overview](crate::dispatching::dialogue). -#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)] -#[deprecated(note = "Use dispatching2 instead")] -pub enum DialogueStage { - Next(D), - Exit, -} - -/// Returns a new dialogue state. -/// -/// Note the `Dialogue: From` constraint. It means that you don't need to -/// pass `Dialogue` -- you can just pass one of it's states. [`From`] can be -/// conveniently derived by [derive-more]. -/// -/// See [the module-level documentation for the design -/// overview](crate::dispatching::dialogue). -/// -/// [`From`]: std::convert::From -/// [derive-more]: https://crates.io/crates/derive_more -#[deprecated(note = "Use dispatching2 instead")] -pub fn next(new_state: State) -> TransitionOut -where - Dialogue: From, -{ - Ok(DialogueStage::Next(Dialogue::from(new_state))) -} - -/// Exits a dialogue. -/// -/// See [the module-level documentation for the design -/// overview](crate::dispatching::dialogue). -#[deprecated(note = "Use dispatching2 instead")] -pub fn exit() -> TransitionOut { - Ok(DialogueStage::Exit) -} diff --git a/src/dispatching/dialogue/dialogue_with_cx.rs b/src/dispatching/dialogue/dialogue_with_cx.rs deleted file mode 100644 index 19240bdb..00000000 --- a/src/dispatching/dialogue/dialogue_with_cx.rs +++ /dev/null @@ -1,33 +0,0 @@ -use crate::dispatching::{dialogue::GetChatId, UpdateWithCx}; -use std::fmt::Debug; -use teloxide_core::requests::Requester; - -/// A context of a [`DialogueDispatcher`]'s message handler. -/// -/// See [the module-level documentation for the design -/// overview](crate::dispatching::dialogue). -/// -/// [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher -#[derive(Debug)] -#[deprecated(note = "Use dispatching2 instead")] -pub struct DialogueWithCx { - pub cx: UpdateWithCx, - pub dialogue: Result, -} - -impl DialogueWithCx { - /// Creates a new instance with the provided fields. - pub fn new(cx: UpdateWithCx, dialogue: D) -> Self { - Self { cx, dialogue: Ok(dialogue) } - } -} - -impl GetChatId for DialogueWithCx -where - Upd: GetChatId, - R: Requester, -{ - fn chat_id(&self) -> i64 { - self.cx.update.chat_id() - } -} diff --git a/src/dispatching/dialogue/get_chat_id.rs b/src/dispatching/dialogue/get_chat_id.rs index 72014f8e..eb2a1bf1 100644 --- a/src/dispatching/dialogue/get_chat_id.rs +++ b/src/dispatching/dialogue/get_chat_id.rs @@ -1,14 +1,20 @@ +use crate::types::CallbackQuery; use teloxide_core::types::Message; -/// Something that has a chat ID. -#[deprecated(note = "Use dispatching2 instead")] +/// Something that may has a chat ID. pub trait GetChatId { #[must_use] - fn chat_id(&self) -> i64; + fn chat_id(&self) -> Option; } impl GetChatId for Message { - fn chat_id(&self) -> i64 { - self.chat.id + fn chat_id(&self) -> Option { + Some(self.chat.id) + } +} + +impl GetChatId for CallbackQuery { + fn chat_id(&self) -> Option { + self.message.as_ref().map(|mes| mes.chat.id) } } diff --git a/src/dispatching/dialogue/mod.rs b/src/dispatching/dialogue/mod.rs index 2a3e889b..ef8ec3c3 100644 --- a/src/dispatching/dialogue/mod.rs +++ b/src/dispatching/dialogue/mod.rs @@ -1,172 +1,181 @@ -//! Dealing with dialogues. +//! Support for user dialogues. //! -//! There are three main components: +//! The main type is (surprise!) [`Dialogue`]. Under the hood, it is just a +//! wrapper over [`Storage`] and a chat ID. All it does is provides convenient +//! method for manipulating the dialogue state. [`Storage`] is where all +//! dialogue states are stored; it can be either [`InMemStorage`], which is a +//! simple hash map, or database wrappers such as [`SqliteStorage`]. In the +//! latter case, your dialogues are _persistent_, meaning that you can safely +//! restart your bot and all dialogues will remain in the database -- this is a +//! preferred method for production bots. //! -//! 1. Your type `D` (typically an enumeration), implementing [`Transition`]. -//! It is essentially a [FSM]: its variants are possible dialogue states and -//! [`Transition::react`] is a transition function. +//! [`examples/dialogue.rs`] clearly demonstrates the typical usage of +//! dialogues. Your dialogue state can be represented as an enumeration: //! -//! 2. State types, forming `D`. They implement [`Subtransition`]. +//! ```ignore +//! #[derive(DialogueState, Clone)] +//! #[handler_out(anyhow::Result<()>)] +//! pub enum State { +//! #[handler(handle_start)] +//! Start, //! -//! 2. [`Storage`], which encapsulates all the dialogues. +//! #[handler(handle_receive_full_name)] +//! ReceiveFullName, //! -//! 3. [`DialogueDispatcher`], which encapsulates your handler, [`Storage`], -//! and implements [`DispatcherHandler`]. +//! #[handler(handle_receive_age)] +//! ReceiveAge { full_name: String }, //! -//! You pass [`DialogueDispatcher`] into [`Dispatcher`]. Every time -//! [`Dispatcher`] sees an incoming input, it is transferred into -//! [`DialogueDispatcher`], and the following steps are executed: -//! -//! 1. If a storage doesn't contain a dialogue from this chat, supply -//! `D::default()` into you handler, otherwise, supply the saved dialogue -//! from this chat. -//! 2. If a handler has returned [`DialogueStage::Exit`], remove the dialogue -//! from the storage, otherwise ([`DialogueStage::Next`]) force the storage to -//! update the dialogue. -//! -//! To avoid boilerplate, teloxide exports these convenient things: the [`next`] -//! and [`exit`] functions, and `#[derive(BotDialogue)]` with -//! `#[teloxide(subtransition)]`. Here's how your dialogues management code -//! skeleton should look like: -//! -//! ```no_run -//! # #[cfg(feature = "macros")] { -//! use std::convert::Infallible; -//! -//! use teloxide::{ -//! dispatching::dialogue::{InMemStorageError, Transition}, -//! prelude::*, -//! teloxide, RequestError, -//! }; -//! -//! #[derive(Clone)] -//! struct _1State; -//! #[derive(Clone)] -//! struct _2State; -//! #[derive(Clone)] -//! struct _3State; -//! -//! type Out = TransitionOut; -//! -//! #[teloxide(subtransition)] -//! async fn _1_transition(_state: _1State, _cx: TransitionIn>) -> Out { -//! todo!() +//! #[handler(handle_receive_location)] +//! ReceiveLocation { full_name: String, age: u8 }, //! } -//! -//! #[teloxide(subtransition)] -//! async fn _2_transition(_state: _2State, _cx: TransitionIn>) -> Out { -//! todo!() -//! } -//! -//! #[teloxide(subtransition)] -//! async fn _3_transition(_state: _3State, _cx: TransitionIn>) -> Out { -//! todo!() -//! } -//! -//! #[derive(Clone, Transition)] -//! enum D { -//! _1(_1State), -//! _2(_2State), -//! _3(_3State), -//! } -//! -//! impl Default for D { -//! fn default() -> Self { -//! Self::_1(_1State) -//! } -//! } -//! -//! type In = DialogueWithCx, Message, D, InMemStorageError>; -//! -//! #[tokio::main] -//! async fn main() { -//! pretty_env_logger::init(); -//! log::info!("Starting dialogue_bot!"); -//! -//! let bot = Bot::from_env().auto_send(); -//! -//! Dispatcher::new(bot) -//! .messages_handler(DialogueDispatcher::new( -//! |DialogueWithCx { cx, dialogue }: In| async move { -//! // No panic because of std::convert::Infallible. -//! let dialogue = dialogue.unwrap(); -//! dialogue -//! // Instead of () you can pass an arbitrary value, see below. -//! .react(cx, ()) -//! .await -//! .expect("Something wrong with the bot!") -//! }, -//! )) -//! .dispatch() -//! .await; -//! } -//! # } //! ``` //! -//! - `#[teloxide(subtransition)]` implements [`Subtransition`] for the first -//! argument of a function. -//! - `#[derive(Transition)]` implements [`Transition`] for `D`, if all the -//! variants implement [`Subtransition`]. +//! Each state is associated with its respective handler: e.g., when a dialogue +//! state is `ReceiveAge`, `handle_receive_age` is invoked: //! -//! `()` in `.react(cx, ())` is an arbitrary value, which you can pass into -//! Subtransitions. Just append `ans: T` to the parameters of the -//! Subtransitions to pass a differen type. +//! ```ignore +//! async fn handle_receive_age( +//! bot: AutoSend, +//! msg: Message, +//! dialogue: MyDialogue, +//! (full_name,): (String,), // Available from `State::ReceiveAge`. +//! ) -> anyhow::Result<()> { +//! match msg.text().map(|text| text.parse::()) { +//! Some(Ok(age)) => { +//! bot.send_message(msg.chat.id, "What's your location?").await?; +//! dialogue.update(State::ReceiveLocation { full_name, age }).await?; +//! } +//! _ => { +//! bot.send_message(msg.chat.id, "Send me a number.").await?; +//! } +//! } //! -//! See [examples/dialogue_bot] as a real example. +//! Ok(()) +//! } +//! ``` //! -//! [`Transition`]: crate::dispatching::dialogue::Transition -//! [`Subtransition`]: crate::dispatching::dialogue::Subtransition -//! [`Transition::react`]: -//! crate::dispatching::dialogue::Transition::react -//! [FSM]: https://en.wikipedia.org/wiki/Finite-state_machine +//! Variant's fields are passed to state handlers as tuples: `(full_name,): +//! (String,)`. Using [`Dialogue::update`], you can update the dialogue with a +//! new state, in our case -- `State::ReceiveLocation { full_name, age }`. To +//! exit the dialogue, just call [`Dialogue::exit`] and it will be removed from +//! the inner storage: //! -//! [`Storage`]: crate::dispatching::dialogue::Storage +//! ```ignore +//! async fn handle_receive_location( +//! bot: AutoSend, +//! msg: Message, +//! dialogue: MyDialogue, +//! (full_name, age): (String, u8), // Available from `State::ReceiveLocation`. +//! ) -> anyhow::Result<()> { +//! match msg.text() { +//! Some(location) => { +//! let message = +//! format!("Full name: {}\nAge: {}\nLocation: {}", full_name, age, location); +//! bot.send_message(msg.chat.id, message).await?; +//! dialogue.exit().await?; +//! } +//! None => { +//! bot.send_message(msg.chat.id, "Send me a text message.").await?; +//! } +//! } //! -//! [`DialogueStage`]: crate::dispatching::dialogue::DialogueStage -//! [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher +//! Ok(()) +//! } +//! ``` //! -//! [`DialogueStage::Exit`]: -//! crate::dispatching::dialogue::DialogueStage::Exit -//! [`DialogueStage::Next`]: crate::dispatching::dialogue::DialogueStage::Next -//! -//! [`up!`]: crate::up -//! [`next`]: crate::dispatching::dialogue::next -//! [`exit`]: crate::dispatching::dialogue::exit -//! -//! [`DispatcherHandler`]: crate::dispatching::DispatcherHandler -//! [`Dispatcher`]: crate::dispatching::Dispatcher -//! [`Dispatcher::messages_handler`]: -//! crate::dispatching::Dispatcher::messages_handler -//! [`UpdateKind::Message(message)`]: crate::types::UpdateKind::Message -//! -//! [examples/dialogue_bot]: https://github.com/teloxide/teloxide/tree/master/examples/dialogue_bot - -#![allow(clippy::type_complexity)] - -mod dialogue_dispatcher; -mod dialogue_dispatcher_handler; -mod dialogue_stage; -mod dialogue_with_cx; -mod get_chat_id; -mod storage; -mod transition; - -pub use dialogue_dispatcher::DialogueDispatcher; -pub use dialogue_dispatcher_handler::DialogueDispatcherHandler; -pub use dialogue_stage::{exit, next, DialogueStage}; -pub use dialogue_with_cx::DialogueWithCx; -pub use get_chat_id::GetChatId; -pub use transition::{ - Subtransition, SubtransitionOutputType, Transition, TransitionIn, TransitionOut, -}; - -#[cfg(feature = "macros")] -pub use teloxide_macros::Transition; +//! [`examples/dialogue.rs`]: https://github.com/teloxide/teloxide/blob/master/examples/dialogue.rs #[cfg(feature = "redis-storage")] -pub use storage::{RedisStorage, RedisStorageError}; +pub use crate::dispatching::dialogue::{RedisStorage, RedisStorageError}; #[cfg(feature = "sqlite-storage")] -pub use storage::{SqliteStorage, SqliteStorageError}; +pub use crate::dispatching::dialogue::{SqliteStorage, SqliteStorageError}; +pub use get_chat_id::GetChatId; pub use storage::*; + +use std::{marker::PhantomData, sync::Arc}; + +mod get_chat_id; +mod storage; + +/// A handle for controlling dialogue state. +#[derive(Debug)] +pub struct Dialogue +where + S: ?Sized, +{ + storage: Arc, + chat_id: i64, + _phantom: PhantomData, +} + +// `#[derive]` requires generics to implement `Clone`, but `S` is wrapped around +// `Arc`, and `D` is wrapped around PhantomData. +impl Clone for Dialogue +where + S: ?Sized, +{ + fn clone(&self) -> Self { + Dialogue { storage: self.storage.clone(), chat_id: self.chat_id, _phantom: PhantomData } + } +} + +impl Dialogue +where + D: Send + 'static, + S: Storage + ?Sized, +{ + /// Constructs a new dialogue with `storage` (where dialogues are stored) + /// and `chat_id` of a current dialogue. + pub fn new(storage: Arc, chat_id: i64) -> Self { + Self { storage, chat_id, _phantom: PhantomData } + } + + /// Retrieves the current state of the dialogue or `None` if there is no + /// dialogue. + pub async fn get(&self) -> Result, S::Error> { + self.storage.clone().get_dialogue(self.chat_id).await + } + + /// Like [`Dialogue::get`] but returns a default value if there is no + /// dialogue. + pub async fn get_or_default(&self) -> Result + where + D: Default, + { + match self.get().await? { + Some(d) => Ok(d), + None => { + self.storage.clone().update_dialogue(self.chat_id, D::default()).await?; + Ok(D::default()) + } + } + } + + /// Updates the dialogue state. + /// + /// The dialogue type `D` must implement `From` to allow implicit + /// conversion from `State` to `D`. + pub async fn update(&self, state: State) -> Result<(), S::Error> + where + D: From, + { + let new_dialogue = state.into(); + self.storage.clone().update_dialogue(self.chat_id, new_dialogue).await?; + Ok(()) + } + + /// Updates the dialogue with a default value. + pub async fn reset(&self) -> Result<(), S::Error> + where + D: Default, + { + self.update(D::default()).await + } + + /// Removes the dialogue from the storage provided to [`Dialogue::new`]. + pub async fn exit(&self) -> Result<(), S::Error> { + self.storage.clone().remove_dialogue(self.chat_id).await + } +} diff --git a/src/dispatching/dialogue/transition.rs b/src/dispatching/dialogue/transition.rs deleted file mode 100644 index 31f8f167..00000000 --- a/src/dispatching/dialogue/transition.rs +++ /dev/null @@ -1,67 +0,0 @@ -use crate::dispatching::{dialogue::DialogueStage, UpdateWithCx}; -use futures::future::BoxFuture; -use teloxide_core::types::Message; - -/// Represents a transition function of a dialogue FSM. -#[deprecated(note = "Use dispatching2 instead")] -pub trait Transition: Sized { - type Aux; - type Error; - type Requester; - - /// Turns itself into another state, depending on the input message. - /// - /// `aux` will be passed to each subtransition function. - fn react( - self, - cx: TransitionIn, - aux: Self::Aux, - ) -> BoxFuture<'static, TransitionOut>; -} - -/// Like [`Transition`], but from `StateN` -> `Dialogue`. -/// -/// [`Transition`]: crate::dispatching::dialogue::Transition -#[deprecated(note = "Use dispatching2 instead")] -pub trait Subtransition -where - Self::Dialogue: Transition, -{ - type Aux; - type Dialogue; - type Error; - type Requester; - - /// Turns itself into another state, depending on the input message. - /// - /// `aux` is something that is provided by the call side, for example, - /// message's text. - fn react( - self, - cx: TransitionIn, - aux: Self::Aux, - ) -> BoxFuture<'static, TransitionOut>; -} - -/// A type returned from a FSM subtransition function. -/// -/// Now it is used only inside `#[teloxide(subtransition)]` for type inference. -#[doc(hidden)] -#[deprecated(note = "Use dispatching2 instead")] -pub trait SubtransitionOutputType { - type Output; - type Error; -} - -impl SubtransitionOutputType for TransitionOut { - type Output = D; - type Error = E; -} - -/// An input passed into a FSM (sub)transition function. -#[deprecated(note = "Use dispatching2 instead")] -pub type TransitionIn = UpdateWithCx; - -/// A type returned from a FSM (sub)transition function. -#[deprecated(note = "Use dispatching2 instead")] -pub type TransitionOut = Result, E>; diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 72cb1cd3..6a776d8e 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -1,263 +1,158 @@ -use std::{fmt::Debug, sync::Arc}; - use crate::{ + adaptors::CacheMe, dispatching::{ - stop_token::StopToken, - update_listeners::{self, UpdateListener}, - DispatcherHandler, UpdateWithCx, + stop_token::StopToken, update_listeners, update_listeners::UpdateListener, ShutdownToken, }, error_handlers::{ErrorHandler, LoggingErrorHandler}, + requests::Requester, + types::{AllowedUpdate, Update}, utils::shutdown_token::shutdown_check_timeout_for, }; - -use futures::{stream::FuturesUnordered, StreamExt}; +use dptree::di::{DependencyMap, DependencySupplier}; +use futures::{future::BoxFuture, StreamExt}; +use std::{collections::HashSet, fmt::Debug, ops::ControlFlow, sync::Arc}; use teloxide_core::{ - requests::Requester, - types::{ - AllowedUpdate, CallbackQuery, ChatJoinRequest, ChatMemberUpdated, ChosenInlineResult, - InlineQuery, Message, Poll, PollAnswer, PreCheckoutQuery, ShippingQuery, Update, - UpdateKind, - }, + requests::{Request, RequesterExt}, + types::UpdateKind, }; -use tokio::{sync::mpsc, task::JoinHandle, time::timeout}; +use tokio::time::timeout; -use crate::utils::shutdown_token::ShutdownToken; +use std::future::Future; -type Tx = Option>>; +/// The builder for [`Dispatcher`]. +pub struct DispatcherBuilder { + bot: R, + dependencies: DependencyMap, + handler: UpdateHandler, + default_handler: DefaultHandler, + error_handler: Arc + Send + Sync>, +} -/// One dispatcher to rule them all. -/// -/// See the [module-level documentation](crate::dispatching) for the design -/// overview. -#[deprecated(note = "Use dispatching2 instead")] -pub struct Dispatcher { - requester: R, +impl DispatcherBuilder +where + R: Clone + Requester + Clone + Send + Sync + 'static, + Err: Debug + Send + Sync + 'static, +{ + /// Specifies a handler that will be called for an unhandled update. + /// + /// By default, it is a mere [`log::warn`]. + #[must_use] + pub fn default_handler(self, handler: H) -> Self + where + H: Fn(Arc) -> Fut + Send + Sync + 'static, + Fut: Future + Send + 'static, + { + let handler = Arc::new(handler); - messages_queue: Tx, - edited_messages_queue: Tx, - channel_posts_queue: Tx, - edited_channel_posts_queue: Tx, - inline_queries_queue: Tx, - chosen_inline_results_queue: Tx, - callback_queries_queue: Tx, - shipping_queries_queue: Tx, - pre_checkout_queries_queue: Tx, - polls_queue: Tx, - poll_answers_queue: Tx, - my_chat_members_queue: Tx, - chat_members_queue: Tx, - chat_join_requests_queue: Tx, + Self { + default_handler: Box::new(move |upd| { + let handler = Arc::clone(&handler); + Box::pin(handler(upd)) + }), + ..self + } + } - running_handlers: FuturesUnordered>, + /// Specifies a handler that will be called on a handler error. + /// + /// By default, it is [`LoggingErrorHandler`]. + #[must_use] + pub fn error_handler(self, handler: Arc + Send + Sync>) -> Self { + Self { error_handler: handler, ..self } + } + + /// Specifies dependencies that can be used inside of handlers. + /// + /// By default, there is no dependencies. + #[must_use] + pub fn dependencies(self, dependencies: DependencyMap) -> Self { + Self { dependencies, ..self } + } + + /// Constructs [`Dispatcher`]. + #[must_use] + pub fn build(self) -> Dispatcher { + Dispatcher { + bot: self.bot.clone(), + cache_me_bot: self.bot.cache_me(), + dependencies: self.dependencies, + handler: self.handler, + default_handler: self.default_handler, + error_handler: self.error_handler, + allowed_updates: Default::default(), + state: ShutdownToken::new(), + } + } +} + +/// The base for update dispatching. +pub struct Dispatcher { + bot: R, + cache_me_bot: CacheMe, + dependencies: DependencyMap, + + handler: UpdateHandler, + default_handler: DefaultHandler, + error_handler: Arc + Send + Sync>, + // TODO: respect allowed_udpates + allowed_updates: HashSet, state: ShutdownToken, } -impl Dispatcher +// TODO: it is allowed to return message as response on telegram request in +// webhooks, so we can allow this too. See more there: https://core.telegram.org/bots/api#making-requests-when-getting-updates + +/// A handler that processes updates from Telegram. +pub type UpdateHandler = dptree::Handler<'static, DependencyMap, Result<(), Err>>; + +type DefaultHandler = Box) -> BoxFuture<'static, ()> + Send + Sync>; + +impl Dispatcher where - R: Send + 'static, + R: Requester + Clone + Send + Sync + 'static, + Err: Send + Sync + 'static, { - /// Constructs a new dispatcher with the specified `requester`. + /// Constructs a new [`DispatcherBuilder`] with `bot` and `handler`. #[must_use] - pub fn new(requester: R) -> Self { - Self { - requester, - messages_queue: None, - edited_messages_queue: None, - channel_posts_queue: None, - edited_channel_posts_queue: None, - inline_queries_queue: None, - chosen_inline_results_queue: None, - callback_queries_queue: None, - shipping_queries_queue: None, - pre_checkout_queries_queue: None, - polls_queue: None, - poll_answers_queue: None, - my_chat_members_queue: None, - chat_members_queue: None, - chat_join_requests_queue: None, - running_handlers: FuturesUnordered::new(), - state: ShutdownToken::new(), + pub fn builder(bot: R, handler: UpdateHandler) -> DispatcherBuilder + where + Err: Debug, + { + DispatcherBuilder { + bot, + dependencies: DependencyMap::new(), + handler, + default_handler: Box::new(|upd| { + log::warn!("Unhandled update: {:?}", upd); + Box::pin(async {}) + }), + error_handler: LoggingErrorHandler::new(), } } - #[must_use] - fn new_tx(&mut self, h: H) -> Tx - where - H: DispatcherHandler + Send + 'static, - Upd: Send + 'static, - R: Send + 'static, - { - let (tx, rx) = mpsc::unbounded_channel(); - let join_handle = tokio::spawn(h.handle(rx)); - - self.running_handlers.push(join_handle); - - Some(tx) - } - - /// Setup the `^C` handler which [`shutdown`]s dispatching. - /// - /// [`shutdown`]: ShutdownToken::shutdown - #[cfg(feature = "ctrlc_handler")] - #[must_use] - pub fn setup_ctrlc_handler(self) -> Self { - let token = self.state.clone(); - tokio::spawn(async move { - loop { - tokio::signal::ctrl_c().await.expect("Failed to listen for ^C"); - - match token.shutdown() { - Ok(f) => { - log::info!("^C received, trying to shutdown the dispatcher..."); - f.await; - log::info!("dispatcher is shutdown..."); - } - Err(_) => { - log::info!("^C received, the dispatcher isn't running, ignoring the signal") - } - } - } - }); - - self - } - - #[must_use] - pub fn messages_handler(mut self, h: H) -> Self - where - H: DispatcherHandler + 'static + Send, - { - self.messages_queue = self.new_tx(h); - self - } - - #[must_use] - pub fn edited_messages_handler(mut self, h: H) -> Self - where - H: DispatcherHandler + 'static + Send, - { - self.edited_messages_queue = self.new_tx(h); - self - } - - #[must_use] - pub fn channel_posts_handler(mut self, h: H) -> Self - where - H: DispatcherHandler + 'static + Send, - { - self.channel_posts_queue = self.new_tx(h); - self - } - - #[must_use] - pub fn edited_channel_posts_handler(mut self, h: H) -> Self - where - H: DispatcherHandler + 'static + Send, - { - self.edited_channel_posts_queue = self.new_tx(h); - self - } - - #[must_use] - pub fn inline_queries_handler(mut self, h: H) -> Self - where - H: DispatcherHandler + 'static + Send, - { - self.inline_queries_queue = self.new_tx(h); - self - } - - #[must_use] - pub fn chosen_inline_results_handler(mut self, h: H) -> Self - where - H: DispatcherHandler + 'static + Send, - { - self.chosen_inline_results_queue = self.new_tx(h); - self - } - - #[must_use] - pub fn callback_queries_handler(mut self, h: H) -> Self - where - H: DispatcherHandler + 'static + Send, - { - self.callback_queries_queue = self.new_tx(h); - self - } - - #[must_use] - pub fn shipping_queries_handler(mut self, h: H) -> Self - where - H: DispatcherHandler + 'static + Send, - { - self.shipping_queries_queue = self.new_tx(h); - self - } - - #[must_use] - pub fn pre_checkout_queries_handler(mut self, h: H) -> Self - where - H: DispatcherHandler + 'static + Send, - { - self.pre_checkout_queries_queue = self.new_tx(h); - self - } - - #[must_use] - pub fn polls_handler(mut self, h: H) -> Self - where - H: DispatcherHandler + 'static + Send, - { - self.polls_queue = self.new_tx(h); - self - } - - #[must_use] - pub fn poll_answers_handler(mut self, h: H) -> Self - where - H: DispatcherHandler + 'static + Send, - { - self.poll_answers_queue = self.new_tx(h); - self - } - - #[must_use] - pub fn my_chat_members_handler(mut self, h: H) -> Self - where - H: DispatcherHandler + 'static + Send, - { - self.my_chat_members_queue = self.new_tx(h); - self - } - - #[must_use] - pub fn chat_members_handler(mut self, h: H) -> Self - where - H: DispatcherHandler + 'static + Send, - { - self.chat_members_queue = self.new_tx(h); - self - } - /// Starts your bot with the default parameters. /// /// The default parameters are a long polling update listener and log all - /// errors produced by this listener). + /// errors produced by this listener. /// - /// Please note that after shutting down (either because of [`shutdown`], - /// [a ctrlc signal], or [`UpdateListener`] returning `None`) all handlers - /// will be gone. As such, to restart listening you need to re-add - /// handlers. + /// Each time a handler is invoked, [`Dispatcher`] adds the following + /// dependencies (in addition to those passed to + /// [`DispatcherBuilder::dependencies`]): + /// + /// - Your bot passed to [`Dispatcher::builder`]; + /// - An update from Telegram; + /// - [`crate::types::Me`] (can be used in [`HandlerExt::filter_command`]). /// /// [`shutdown`]: ShutdownToken::shutdown /// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler + /// [`HandlerExt::filter_command`]: crate::dispatching::HandlerExt::filter_command pub async fn dispatch(&mut self) where R: Requester + Clone, ::GetUpdates: Send, { - let listener = update_listeners::polling_default(self.requester.clone()).await; + let listener = update_listeners::polling_default(self.bot.clone()).await; let error_handler = LoggingErrorHandler::with_custom_text("An error from the update listener"); @@ -267,10 +162,7 @@ where /// Starts your bot with custom `update_listener` and /// `update_listener_error_handler`. /// - /// Please note that after shutting down (either because of [`shutdown`], - /// [a ctrlc signal], or [`UpdateListener`] returning `None`) all handlers - /// will be gone. As such, to restart listening you need to re-add - /// handlers. + /// This method adds the same dependencies as [`Dispatcher::dispatch`]. /// /// [`shutdown`]: ShutdownToken::shutdown /// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler @@ -282,9 +174,8 @@ where UListener: UpdateListener + 'a, Eh: ErrorHandler + 'a, ListenerE: Debug, - R: Requester + Clone, { - self.hint_allowed_updates(&mut update_listener); + update_listener.hint_allowed_updates(&mut self.allowed_updates.clone().into_iter()); let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener); let mut stop_token = Some(update_listener.stop_token()); @@ -309,222 +200,111 @@ where if let Some(token) = stop_token.take() { log::debug!("Start shutting down dispatching..."); token.stop(); + break; } } } } - self.wait_for_handlers().await; + // TODO: wait for executing handlers? + self.state.done(); } + async fn process_update( + &self, + update: Result, + err_handler: &Arc, + ) where + LErrHandler: ErrorHandler, + { + match update { + Ok(upd) => { + if let UpdateKind::Error(err) = upd.kind { + log::error!( + "Cannot parse an update.\nError: {:?}\n\ + This is a bug in teloxide-core, please open an issue here: \ + https://github.com/teloxide/teloxide/issues.", + err, + ); + return; + } + + let mut deps = self.dependencies.clone(); + deps.insert(upd); + deps.insert(self.bot.clone()); + deps.insert( + self.cache_me_bot.get_me().send().await.expect("Failed to retrieve 'me'"), + ); + + match self.handler.dispatch(deps).await { + ControlFlow::Break(Ok(())) => {} + ControlFlow::Break(Err(err)) => { + self.error_handler.clone().handle_error(err).await + } + ControlFlow::Continue(deps) => { + let upd = deps.get(); + (self.default_handler)(upd).await; + } + } + } + Err(err) => err_handler.clone().handle_error(err).await, + } + } + + /// Setups the `^C` handler that [`shutdown`]s dispatching. + /// + /// [`shutdown`]: ShutdownToken::shutdown + #[cfg(feature = "ctrlc_handler")] + pub fn setup_ctrlc_handler(&mut self) -> &mut Self { + let token = self.state.clone(); + tokio::spawn(async move { + loop { + tokio::signal::ctrl_c().await.expect("Failed to listen for ^C"); + + match token.shutdown() { + Ok(f) => { + log::info!("^C received, trying to shutdown the dispatcher..."); + f.await; + log::info!("dispatcher is shutdown..."); + } + Err(_) => { + log::info!("^C received, the dispatcher isn't running, ignoring the signal") + } + } + } + }); + + self + } + /// Returns a shutdown token, which can later be used to shutdown /// dispatching. pub fn shutdown_token(&self) -> ShutdownToken { self.state.clone() } +} - async fn process_update( - &self, - update: Result, - update_listener_error_handler: &Arc, - ) where - R: Requester + Clone, - Eh: ErrorHandler, - ListenerE: Debug, - { - { - log::trace!("Dispatcher received an update: {:?}", update); +#[cfg(test)] +mod tests { + use std::convert::Infallible; - let update = match update { - Ok(update) => update, - Err(error) => { - Arc::clone(update_listener_error_handler).handle_error(error).await; - return; - } - }; + use teloxide_core::Bot; - match update.kind { - UpdateKind::Message(message) => { - send(&self.requester, &self.messages_queue, message, "UpdateKind::Message") - } - UpdateKind::EditedMessage(message) => send( - &self.requester, - &self.edited_messages_queue, - message, - "UpdateKind::EditedMessage", - ), - UpdateKind::ChannelPost(post) => send( - &self.requester, - &self.channel_posts_queue, - post, - "UpdateKind::ChannelPost", - ), - UpdateKind::EditedChannelPost(post) => send( - &self.requester, - &self.edited_channel_posts_queue, - post, - "UpdateKind::EditedChannelPost", - ), - UpdateKind::InlineQuery(query) => send( - &self.requester, - &self.inline_queries_queue, - query, - "UpdateKind::InlineQuery", - ), - UpdateKind::ChosenInlineResult(result) => send( - &self.requester, - &self.chosen_inline_results_queue, - result, - "UpdateKind::ChosenInlineResult", - ), - UpdateKind::CallbackQuery(query) => send( - &self.requester, - &self.callback_queries_queue, - query, - "UpdateKind::CallbackQuer", - ), - UpdateKind::ShippingQuery(query) => send( - &self.requester, - &self.shipping_queries_queue, - query, - "UpdateKind::ShippingQuery", - ), - UpdateKind::PreCheckoutQuery(query) => send( - &self.requester, - &self.pre_checkout_queries_queue, - query, - "UpdateKind::PreCheckoutQuery", - ), - UpdateKind::Poll(poll) => { - send(&self.requester, &self.polls_queue, poll, "UpdateKind::Poll") - } - UpdateKind::PollAnswer(answer) => send( - &self.requester, - &self.poll_answers_queue, - answer, - "UpdateKind::PollAnswer", - ), - UpdateKind::MyChatMember(chat_member_updated) => send( - &self.requester, - &self.my_chat_members_queue, - chat_member_updated, - "UpdateKind::MyChatMember", - ), - UpdateKind::ChatMember(chat_member_updated) => send( - &self.requester, - &self.chat_members_queue, - chat_member_updated, - "UpdateKind::MyChatMember", - ), - UpdateKind::ChatJoinRequest(chat_join_request) => send( - &self.requester, - &self.chat_join_requests_queue, - chat_join_request, - "UpdateKind::ChatJoinRequest", - ), - UpdateKind::Error(err) => { - log::error!( - "Cannot parse an update.\nError: {:?}\n\ - This is a bug in teloxide-core, please open an issue here: \ - https://github.com/teloxide/teloxide/issues.", - err, - ); - } + use super::*; + + #[tokio::test] + async fn test_tokio_spawn() { + tokio::spawn(async { + // Just check that this code compiles. + if false { + Dispatcher::<_, Infallible>::builder(Bot::new(""), dptree::entry()) + .build() + .dispatch() + .await; } - } - } - - fn hint_allowed_updates(&self, listener: &mut impl UpdateListener) { - fn hint_handler_allowed_update( - queue: &Option, - kind: AllowedUpdate, - ) -> std::option::IntoIter { - queue.as_ref().map(|_| kind).into_iter() - } - - let mut allowed = hint_handler_allowed_update(&self.messages_queue, AllowedUpdate::Message) - .chain(hint_handler_allowed_update( - &self.edited_messages_queue, - AllowedUpdate::EditedMessage, - )) - .chain(hint_handler_allowed_update( - &self.channel_posts_queue, - AllowedUpdate::ChannelPost, - )) - .chain(hint_handler_allowed_update( - &self.edited_channel_posts_queue, - AllowedUpdate::EditedChannelPost, - )) - .chain(hint_handler_allowed_update( - &self.inline_queries_queue, - AllowedUpdate::InlineQuery, - )) - .chain(hint_handler_allowed_update( - &self.chosen_inline_results_queue, - AllowedUpdate::ChosenInlineResult, - )) - .chain(hint_handler_allowed_update( - &self.callback_queries_queue, - AllowedUpdate::CallbackQuery, - )) - .chain(hint_handler_allowed_update( - &self.shipping_queries_queue, - AllowedUpdate::ShippingQuery, - )) - .chain(hint_handler_allowed_update( - &self.pre_checkout_queries_queue, - AllowedUpdate::PreCheckoutQuery, - )) - .chain(hint_handler_allowed_update(&self.polls_queue, AllowedUpdate::Poll)) - .chain(hint_handler_allowed_update(&self.poll_answers_queue, AllowedUpdate::PollAnswer)) - .chain(hint_handler_allowed_update( - &self.my_chat_members_queue, - AllowedUpdate::MyChatMember, - )) - .chain(hint_handler_allowed_update( - &self.chat_members_queue, - AllowedUpdate::ChatMember, - )); - - listener.hint_allowed_updates(&mut allowed); - } - - async fn wait_for_handlers(&mut self) { - log::debug!("Waiting for handlers to finish"); - - // Drop all senders, so handlers can stop - self.messages_queue.take(); - self.edited_messages_queue.take(); - self.channel_posts_queue.take(); - self.edited_channel_posts_queue.take(); - self.inline_queries_queue.take(); - self.chosen_inline_results_queue.take(); - self.callback_queries_queue.take(); - self.shipping_queries_queue.take(); - self.pre_checkout_queries_queue.take(); - self.polls_queue.take(); - self.poll_answers_queue.take(); - self.my_chat_members_queue.take(); - self.chat_members_queue.take(); - - // Wait untill all handlers finish - self.running_handlers.by_ref().for_each(|_| async {}).await; - } -} - -fn send<'a, R, Upd>(requester: &'a R, tx: &'a Tx, update: Upd, variant: &'static str) -where - Upd: Debug, - R: Requester + Clone, -{ - if let Some(tx) = tx { - if let Err(error) = tx.send(UpdateWithCx { requester: requester.clone(), update }) { - log::error!( - "The RX part of the {} channel is closed, but an update is received.\nError:{}\n", - variant, - error - ); - } + }) + .await + .unwrap(); } } diff --git a/src/dispatching/dispatcher_handler.rs b/src/dispatching/dispatcher_handler.rs deleted file mode 100644 index 8751d080..00000000 --- a/src/dispatching/dispatcher_handler.rs +++ /dev/null @@ -1,31 +0,0 @@ -use std::future::Future; - -use crate::dispatching::{DispatcherHandlerRx, UpdateWithCx}; -use futures::future::BoxFuture; - -/// An asynchronous handler of a stream of updates used in [`Dispatcher`]. -/// -/// See the [module-level documentation](crate::dispatching) for the design -/// overview. -/// -/// [`Dispatcher`]: crate::dispatching::Dispatcher -#[deprecated(note = "Use dispatching2 instead")] -pub trait DispatcherHandler { - #[must_use] - fn handle(self, updates: DispatcherHandlerRx) -> BoxFuture<'static, ()> - where - UpdateWithCx: Send + 'static; -} - -impl DispatcherHandler for F -where - F: FnOnce(DispatcherHandlerRx) -> Fut + Send + 'static, - Fut: Future + Send + 'static, -{ - fn handle(self, updates: DispatcherHandlerRx) -> BoxFuture<'static, ()> - where - UpdateWithCx: Send + 'static, - { - Box::pin(async move { self(updates).await }) - } -} diff --git a/src/dispatching/dispatcher_handler_rx_ext.rs b/src/dispatching/dispatcher_handler_rx_ext.rs deleted file mode 100644 index 9270826f..00000000 --- a/src/dispatching/dispatcher_handler_rx_ext.rs +++ /dev/null @@ -1,62 +0,0 @@ -use crate::{dispatching::UpdateWithCx, utils::command::BotCommand}; -use futures::{stream::BoxStream, Stream, StreamExt}; -use teloxide_core::types::Message; - -/// An extension trait to be used with [`DispatcherHandlerRx`]. -/// -/// See the [module-level documentation](crate::dispatching) for the design -/// overview. -/// -/// [`DispatcherHandlerRx`]: crate::dispatching::DispatcherHandlerRx -#[deprecated(note = "Use dispatching2 instead")] -pub trait DispatcherHandlerRxExt { - /// Extracts only text messages from this stream of arbitrary messages. - fn text_messages(self) -> BoxStream<'static, (UpdateWithCx, String)> - where - Self: Stream>, - R: Send + 'static; - - /// Extracts only commands with their arguments from this stream of - /// arbitrary messages. - fn commands(self, bot_name: N) -> BoxStream<'static, (UpdateWithCx, C)> - where - Self: Stream>, - C: BotCommand, - N: Into + Send, - R: Send + 'static; -} - -impl DispatcherHandlerRxExt for T -where - T: Send + 'static, -{ - fn text_messages(self) -> BoxStream<'static, (UpdateWithCx, String)> - where - Self: Stream>, - R: Send + 'static, - { - self.filter_map(|cx| async move { - let text = cx.update.text().map(ToOwned::to_owned); - text.map(move |text| (cx, text)) - }) - .boxed() - } - - fn commands(self, bot_name: N) -> BoxStream<'static, (UpdateWithCx, C)> - where - Self: Stream>, - C: BotCommand, - N: Into + Send, - R: Send + 'static, - { - let bot_name = bot_name.into(); - - self.text_messages() - .filter_map(move |(cx, text)| { - let bot_name = bot_name.clone(); - - async move { C::parse(&text, &bot_name).map(|command| (cx, command)).ok() } - }) - .boxed() - } -} diff --git a/src/dispatching2/filter_ext.rs b/src/dispatching/filter_ext.rs similarity index 100% rename from src/dispatching2/filter_ext.rs rename to src/dispatching/filter_ext.rs diff --git a/src/dispatching2/handler_ext.rs b/src/dispatching/handler_ext.rs similarity index 99% rename from src/dispatching2/handler_ext.rs rename to src/dispatching/handler_ext.rs index 6fcafca1..a111353a 100644 --- a/src/dispatching2/handler_ext.rs +++ b/src/dispatching/handler_ext.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use crate::{ - dispatching2::{ + dispatching::{ dialogue::{Dialogue, GetChatId, Storage}, HandlerFactory, }, diff --git a/src/dispatching2/handler_factory.rs b/src/dispatching/handler_factory.rs similarity index 100% rename from src/dispatching2/handler_factory.rs rename to src/dispatching/handler_factory.rs diff --git a/src/dispatching/mod.rs b/src/dispatching/mod.rs index 0aaebea0..d7bb794a 100644 --- a/src/dispatching/mod.rs +++ b/src/dispatching/mod.rs @@ -1,74 +1,113 @@ -//! Old updates dispatching (**DEPRECATED**: use [`crate::dispatching2`] -//! instead). +//! An update dispatching model based on [`dptree`]. //! -//! The key type here is [`Dispatcher`]. It encapsulates [`Bot`] and handlers -//! for [all the update kinds]. +//! In teloxide, updates are dispatched by a pipleine. The central type is +//! [`dptree::Handler`] -- it represents a handler of an update; since the API +//! is highly declarative, you can combine handlers with each other via such +//! methods as [`dptree::Handler::chain`] and [`dptree::Handler::branch`]. The +//! former method pipes one handler to another one, whilst the latter creates a +//! new node, as communicated by the name. For more information, please refer to +//! the documentation of [`dptree`]. //! -//! Every handler accept [`tokio::sync::mpsc::UnboundedReceiver`] (the RX halve -//! of an asynchronous channel). Inside a body of your handler, you typically -//! asynchronously concurrently iterate through updates like this: +//! The pattern itself is called [chain of responsibility], a well-known design +//! technique across OOP developers. But unlike typical object-oriented design, +//! we employ declarative FP-style functions like [`dptree::filter`], +//! [`dptree::filter_map`], and [`dptree::endpoint`]; these functions create +//! special forms of [`dptree::Handler`]; for more information, please refer to +//! their respective documentation. Each of these higher-order functions accept +//! a closure that is made into a handler -- this closure can take any +//! additional parameters, which must be supplied while creating [`Dispatcher`] +//! (see [`DispatcherBuilder::dependencies`]). //! -//! ``` +//! The [`Dispatcher`] type puts all these things together: it only provides +//! [`Dispatcher::dispatch`] and a handful of other methods. Once you call +//! `.dispatch()`, it will retrieve updates from the Telegram server and pass +//! them to your handler, which is a parameter of [`Dispatcher::builder`]. +//! +//! Let us look at a simple example: +//! +//! +//! ([Full](https://github.com/teloxide/teloxide/blob/master/examples/shared_state.rs)) +//! +//! ```no_run +//! use std::sync::atomic::{AtomicU64, Ordering}; +//! +//! use once_cell::sync::Lazy; //! use teloxide::prelude::*; -//! use tokio_stream::wrappers::UnboundedReceiverStream; //! -//! async fn handle_messages(rx: DispatcherHandlerRx, Message>) { -//! UnboundedReceiverStream::new(rx) -//! .for_each_concurrent(None, |message| async move { -//! dbg!(message.update); -//! }) -//! .await; -//! } +//! static MESSAGES_TOTAL: Lazy = Lazy::new(AtomicU64::default); +//! +//! # #[tokio::main] +//! # async fn main() { +//! pretty_env_logger::init(); +//! log::info!("Starting shared_state_bot..."); +//! +//! let bot = Bot::from_env().auto_send(); +//! +//! let handler = Update::filter_message().branch(dptree::endpoint( +//! |msg: Message, bot: AutoSend| async move { +//! let previous = MESSAGES_TOTAL.fetch_add(1, Ordering::Relaxed); +//! bot.send_message(msg.chat.id, format!("I received {} messages in total.", previous)) +//! .await?; +//! respond(()) +//! }, +//! )); +//! +//! Dispatcher::builder(bot, handler).build().setup_ctrlc_handler().dispatch().await; +//! # } //! ``` //! -//! When [`Update`] is received from Telegram, [`Dispatcher`] pushes it into an -//! appropriate handler, depending on its kind. That's simple! +//! 1. First, we create the bot: `let bot = Bot::from_env().auto_send()`. +//! 2. Then we construct an update handler. While it is possible to handle all +//! kinds of [`crate::types::Update`], here we are only interested in +//! [`crate::types::Message`]: [`UpdateFilterExt::filter_message`] create a +//! handler object which filters all messages out of a generic update. +//! 3. By doing `.branch(dptree::endpoint(...))`, we set up a custom handling +//! closure that receives `msg: Message` and `bot: AutoSend`. There are +//! called dependencies: `msg` is supplied by +//! [`UpdateFilterExt::filter_message`], while `bot` is supplied by +//! [`Dispatcher`]. //! -//! **Note** that handlers must implement [`DispatcherHandler`], which means -//! that: -//! - You are able to supply [`DialogueDispatcher`] as a handler. -//! - You are able to supply functions that accept -//! [`tokio::sync::mpsc::UnboundedReceiver`] and return `Future` -//! as a handler. +//! That being said, if we receive a message, the dispatcher will call our +//! handler, but if we receive something other than a message (e.g., a channel +//! post), you will see an unhandled update notice in your terminal. //! -//! Since they implement [`DispatcherHandler`] too. +//! This is a very limited example of update pipelining facilities. In more +//! involved scenarios, there are multiple branches and chains; if one element +//! of a chain fails to handle an update, the update will be passed forwards; if +//! no handler succeeds at handling the update, [`Dispatcher`] will invoke a +//! default handler set up via [`DispatcherBuilder::default_handler`]. //! -//! [See the examples](https://github.com/teloxide/teloxide/tree/master/examples). +//! Update pipelining provides several advantages over the typical `match +//! (update.kind) { ... }` approach: //! -//! [`Dispatcher`]: crate::dispatching::Dispatcher -//! [all the update kinds]: crate::types::UpdateKind -//! [`Update`]: crate::types::Update -//! [`ErrorHandler`]: crate::dispatching::ErrorHandler -//! [`DispatcherHandler`]: crate::dispatching::DispatcherHandler -//! [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher -//! [`DispatcherHandlerResult`]: crate::dispatching::DispatcherHandlerResult -//! [`Bot`]: crate::Bot -//! [`tokio::sync::mpsc::UnboundedReceiver`]: https://docs.rs/tokio/0.2.11/tokio/sync/mpsc/struct.UnboundedReceiver.html -//! [examples/dialogue_bot]: https://github.com/teloxide/teloxide/tree/master/examples/dialogue_bot +//! 1. It supports _extension_: e.g., you +//! can define extension filters or some other handlers and then combine them in +//! a single place, thus facilitating loose coupling. +//! 2. Pipelining exhibits a natural syntax for expressing message processing. +//! 3. Lastly, it provides a primitive form of [dependency injection (DI)], +//! which allows you to deal with such objects as a bot and various update types +//! easily. +//! +//! For a more involved example, see [`examples/dispatching_features.rs`](https://github.com/teloxide/teloxide/blob/master/examples/dispatching_features.rs). +//! +//! TODO: explain a more involved example with multiple branches. +//! +//! [chain of responsibility]: https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern +//! [dependency injection (DI)]: https://en.wikipedia.org/wiki/Dependency_injection -#![allow(deprecated)] +#[cfg(all(feature = "ctrlc_handler"))] +pub mod repls; pub mod dialogue; +mod dispatcher; +mod filter_ext; +mod handler_ext; +mod handler_factory; pub mod stop_token; pub mod update_listeners; -#[cfg(feature = "ctrlc_handler")] -pub(crate) mod repls; - -mod dispatcher; -mod dispatcher_handler; -mod dispatcher_handler_rx_ext; -mod update_with_cx; - pub use crate::utils::shutdown_token::{IdleShutdownError, ShutdownToken}; -pub use dispatcher::Dispatcher; -pub use dispatcher_handler::DispatcherHandler; -pub use dispatcher_handler_rx_ext::DispatcherHandlerRxExt; -use tokio::sync::mpsc::UnboundedReceiver; -pub use update_with_cx::{UpdateWithCx, UpdateWithCxRequesterType}; - -/// A type of a stream, consumed by [`Dispatcher`]'s handlers. -/// -/// [`Dispatcher`]: crate::dispatching::Dispatcher -#[deprecated(note = "Use dispatching2 instead")] -pub type DispatcherHandlerRx = UnboundedReceiver>; +pub use dispatcher::{Dispatcher, DispatcherBuilder, UpdateHandler}; +pub use filter_ext::{MessageFilterExt, UpdateFilterExt}; +pub use handler_ext::HandlerExt; +pub use handler_factory::HandlerFactory; diff --git a/src/dispatching/repls/commands_repl.rs b/src/dispatching/repls/commands_repl.rs index d87cb83f..54ee2ef7 100644 --- a/src/dispatching/repls/commands_repl.rs +++ b/src/dispatching/repls/commands_repl.rs @@ -1,46 +1,46 @@ use crate::{ dispatching::{ - update_listeners, update_listeners::UpdateListener, Dispatcher, DispatcherHandlerRx, - DispatcherHandlerRxExt, UpdateWithCx, + update_listeners, update_listeners::UpdateListener, HandlerExt, UpdateFilterExt, }, - error_handlers::{LoggingErrorHandler, OnError}, + error_handlers::LoggingErrorHandler, + types::Update, utils::command::BotCommand, }; -use futures::StreamExt; -use std::{fmt::Debug, future::Future, sync::Arc}; -use teloxide_core::{requests::Requester, types::Message}; -use tokio_stream::wrappers::UnboundedReceiverStream; +use dptree::di::{DependencyMap, Injectable}; +use std::{fmt::Debug, marker::PhantomData}; +use teloxide_core::requests::Requester; /// A [REPL] for commands. /// /// All errors from an update listener and handler will be logged. /// -/// # Caution +/// ## Caution /// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, /// because Telegram disallow multiple requests at the same time from the same /// bot. /// +/// ## Dependency requirements +/// +/// - Those of [`HandlerExt::filter_command`]. +/// /// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop /// [`Dispatcher`]: crate::dispatching::Dispatcher #[cfg(feature = "ctrlc_handler")] -pub async fn commands_repl(requester: R, bot_name: N, handler: H) +pub async fn commands_repl<'a, R, Cmd, H, E, Args>(bot: R, handler: H, cmd: PhantomData) where - Cmd: BotCommand + Send + 'static, - H: Fn(UpdateWithCx, Cmd) -> Fut + Send + Sync + 'static, - Fut: Future> + Send + 'static, - Result<(), HandlerE>: OnError, - HandlerE: Debug + Send, - N: Into + Send + 'static, - R: Requester + Send + Clone + 'static, + Cmd: BotCommand + Send + Sync + 'static, + H: Injectable, Args> + Send + Sync + 'static, + R: Requester + Clone + Send + Sync + 'static, ::GetUpdates: Send, + E: Debug + Send + Sync + 'static, { - let cloned_requester = requester.clone(); + let cloned_bot = bot.clone(); commands_repl_with_listener( - requester, - bot_name, + bot, handler, - update_listeners::polling_default(cloned_requester).await, + update_listeners::polling_default(cloned_bot).await, + cmd, ) .await; } @@ -49,50 +49,55 @@ where /// /// All errors from an update listener and handler will be logged. /// -/// # Caution +/// ## Caution /// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, /// because Telegram disallow multiple requests at the same time from the same /// bot. /// +/// ## Dependency requirements +/// +/// - Those of [`HandlerExt::filter_command`]. +/// /// [`Dispatcher`]: crate::dispatching::Dispatcher /// [`commands_repl`]: crate::dispatching::repls::commands_repl() /// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener #[cfg(feature = "ctrlc_handler")] -pub async fn commands_repl_with_listener<'a, R, Cmd, H, Fut, L, ListenerE, HandlerE, N>( - requester: R, - bot_name: N, +pub async fn commands_repl_with_listener<'a, R, Cmd, H, L, ListenerE, E, Args>( + bot: R, handler: H, listener: L, + _cmd: PhantomData, ) where - Cmd: BotCommand + Send + 'static, - H: Fn(UpdateWithCx, Cmd) -> Fut + Send + Sync + 'static, - Fut: Future> + Send + 'static, + Cmd: BotCommand + Send + Sync + 'static, + H: Injectable, Args> + Send + Sync + 'static, L: UpdateListener + Send + 'a, ListenerE: Debug + Send + 'a, - Result<(), HandlerE>: OnError, - HandlerE: Debug + Send, - N: Into + Send + 'static, - R: Requester + Clone + Send + 'static, + R: Requester + Clone + Send + Sync + 'static, + E: Debug + Send + Sync + 'static, { - let handler = Arc::new(handler); + use crate::dispatching::Dispatcher; - Dispatcher::::new(requester) - .messages_handler(move |rx: DispatcherHandlerRx| { - UnboundedReceiverStream::new(rx).commands::(bot_name).for_each_concurrent( - None, - move |(cx, cmd)| { - let handler = Arc::clone(&handler); + // Other update types are of no interest to use since this REPL is only for + // commands. See . + let ignore_update = |_upd| Box::pin(async {}); - async move { - handler(cx, cmd).await.log_on_error().await; - } - }, - ) - }) - .setup_ctrlc_handler() + let mut dispatcher = Dispatcher::builder( + bot, + Update::filter_message().filter_command::().branch(dptree::endpoint(handler)), + ) + .default_handler(ignore_update) + .build(); + + #[cfg(feature = "ctrlc_handler")] + dispatcher.setup_ctrlc_handler(); + + // To make mutable var from immutable. + let mut dispatcher = dispatcher; + + dispatcher .dispatch_with_listener( listener, LoggingErrorHandler::with_custom_text("An error from the update listener"), ) - .await + .await; } diff --git a/src/dispatching/repls/dialogues_repl.rs b/src/dispatching/repls/dialogues_repl.rs index bcc6fc26..755d4dd3 100644 --- a/src/dispatching/repls/dialogues_repl.rs +++ b/src/dispatching/repls/dialogues_repl.rs @@ -30,7 +30,7 @@ where D: Clone + Default + Send + 'static, Fut: Future> + Send + 'static, R: Requester + Send + Clone + 'static, - ::GetUpdates: Send, + ::GetUpdatesFaultTolerant: Send, { let cloned_requester = requester.clone(); diff --git a/src/dispatching/repls/mod.rs b/src/dispatching/repls/mod.rs index f6bf31f0..ab1e9f5f 100644 --- a/src/dispatching/repls/mod.rs +++ b/src/dispatching/repls/mod.rs @@ -1,7 +1,8 @@ +//! REPLs for dispatching updates. + mod commands_repl; -mod dialogues_repl; mod repl; pub use commands_repl::{commands_repl, commands_repl_with_listener}; -pub use dialogues_repl::{dialogues_repl, dialogues_repl_with_listener}; +//pub use dialogues_repl::{dialogues_repl, dialogues_repl_with_listener}; pub use repl::{repl, repl_with_listener}; diff --git a/src/dispatching/repls/repl.rs b/src/dispatching/repls/repl.rs index 799558f7..ca5622a4 100644 --- a/src/dispatching/repls/repl.rs +++ b/src/dispatching/repls/repl.rs @@ -1,14 +1,11 @@ use crate::{ - dispatching::{ - update_listeners, update_listeners::UpdateListener, Dispatcher, DispatcherHandlerRx, - UpdateWithCx, - }, + dispatching::{update_listeners, update_listeners::UpdateListener, UpdateFilterExt}, error_handlers::{LoggingErrorHandler, OnError}, + types::Update, }; -use futures::StreamExt; -use std::{fmt::Debug, future::Future, sync::Arc}; -use teloxide_core::{requests::Requester, types::Message}; -use tokio_stream::wrappers::UnboundedReceiverStream; +use dptree::di::{DependencyMap, Injectable}; +use std::fmt::Debug; +use teloxide_core::requests::Requester; /// A [REPL] for messages. /// @@ -22,22 +19,16 @@ use tokio_stream::wrappers::UnboundedReceiverStream; /// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop /// [`Dispatcher`]: crate::dispatching::Dispatcher #[cfg(feature = "ctrlc_handler")] -pub async fn repl(requester: R, handler: H) +pub async fn repl(bot: R, handler: H) where - H: Fn(UpdateWithCx) -> Fut + Send + Sync + 'static, - Fut: Future> + Send + 'static, + H: Injectable, Args> + Send + Sync + 'static, Result<(), E>: OnError, - E: Debug + Send, - R: Requester + Send + Clone + 'static, + E: Debug + Send + Sync + 'static, + R: Requester + Send + Sync + Clone + 'static, ::GetUpdates: Send, { - let cloned_requester = requester.clone(); - repl_with_listener( - requester, - handler, - update_listeners::polling_default(cloned_requester).await, - ) - .await; + let cloned_bot = bot.clone(); + repl_with_listener(bot, handler, update_listeners::polling_default(cloned_bot).await).await; } /// Like [`repl`], but with a custom [`UpdateListener`]. @@ -53,44 +44,34 @@ where /// [`repl`]: crate::dispatching::repls::repl() /// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener #[cfg(feature = "ctrlc_handler")] -pub async fn repl_with_listener<'a, R, H, Fut, E, L, ListenerE>( - requester: R, - handler: H, - listener: L, -) where - H: Fn(UpdateWithCx) -> Fut + Send + Sync + 'static, - Fut: Future> + Send + 'static, +pub async fn repl_with_listener<'a, R, H, E, L, ListenerE, Args>(bot: R, handler: H, listener: L) +where + H: Injectable, Args> + Send + Sync + 'static, L: UpdateListener + Send + 'a, ListenerE: Debug, Result<(), E>: OnError, - E: Debug + Send, - R: Requester + Clone + Send + 'static, + E: Debug + Send + Sync + 'static, + R: Requester + Clone + Send + Sync + 'static, { - let handler = Arc::new(handler); + use crate::dispatching::Dispatcher; - Dispatcher::new(requester) - .messages_handler(|rx: DispatcherHandlerRx| { - UnboundedReceiverStream::new(rx).for_each_concurrent(None, move |message| { - let handler = Arc::clone(&handler); + // Other update types are of no interest to use since this REPL is only for + // messages. See . + let ignore_update = |_upd| Box::pin(async {}); - async move { - handler(message).await.log_on_error().await; - } - }) - }) - .setup_ctrlc_handler() + #[allow(unused_mut)] + let mut dispatcher = + Dispatcher::builder(bot, Update::filter_message().branch(dptree::endpoint(handler))) + .default_handler(ignore_update) + .build(); + + #[cfg(feature = "ctrlc_handler")] + dispatcher.setup_ctrlc_handler(); + + dispatcher .dispatch_with_listener( listener, LoggingErrorHandler::with_custom_text("An error from the update listener"), ) .await; } - -#[test] -fn repl_is_send() { - let bot = crate::Bot::new(""); - let repl = crate::repl(bot, |_| async { crate::respond(()) }); - assert_send(&repl); - - fn assert_send(_: &impl Send) {} -} diff --git a/src/dispatching/update_with_cx.rs b/src/dispatching/update_with_cx.rs deleted file mode 100644 index ecdc40a6..00000000 --- a/src/dispatching/update_with_cx.rs +++ /dev/null @@ -1,248 +0,0 @@ -use crate::dispatching::dialogue::GetChatId; -use teloxide_core::{ - payloads::{ - SendAnimationSetters, SendAudioSetters, SendContactSetters, SendDocumentSetters, - SendLocationSetters, SendMediaGroupSetters, SendMessageSetters, SendPhotoSetters, - SendStickerSetters, SendVenueSetters, SendVideoNoteSetters, SendVideoSetters, - SendVoiceSetters, - }, - requests::{Request, Requester}, - types::{ChatId, InputFile, InputMedia, Message}, -}; - -/// A [`Dispatcher`]'s handler's context of a bot and an update. -/// -/// See the [module-level documentation](crate::dispatching) for the design -/// overview. -/// -/// [`Dispatcher`]: crate::dispatching::Dispatcher -#[derive(Debug)] -#[deprecated(note = "Use dispatching2 instead")] -pub struct UpdateWithCx { - pub requester: R, - pub update: Upd, -} - -impl GetChatId for UpdateWithCx -where - Upd: GetChatId, -{ - fn chat_id(&self) -> i64 { - self.update.chat_id() - } -} - -#[doc(hidden)] -// Now it is used only inside `#[teloxide(subtransition)]` for type inference. -pub trait UpdateWithCxRequesterType { - type Requester; -} - -impl UpdateWithCxRequesterType for UpdateWithCx { - type Requester = R; -} - -impl UpdateWithCx -where - R: Requester, -{ - /// A shortcut for `.answer(text).send().await`. - #[deprecated(note = "Use .answer(text).await instead")] - pub async fn answer_str(&self, text: T) -> Result - where - T: Into, - R::SendMessage: std::future::Future, - { - self.answer(text).send().await - } - - pub fn answer(&self, text: T) -> R::SendMessage - where - T: Into, - { - self.requester.send_message(self.chat_id(), text) - } - - pub fn reply_to(&self, text: T) -> R::SendMessage - where - T: Into, - { - self.requester.send_message(self.chat_id(), text).reply_to_message_id(self.update.id) - } - - pub fn reply_audio(&self, audio: InputFile) -> R::SendAudio { - self.requester.send_audio(self.update.chat.id, audio).reply_to_message_id(self.update.id) - } - - pub fn reply_animation(&self, animation: InputFile) -> R::SendAnimation { - self.requester - .send_animation(self.update.chat.id, animation) - .reply_to_message_id(self.update.id) - } - - pub fn reply_document(&self, document: InputFile) -> R::SendDocument { - self.requester - .send_document(self.update.chat.id, document) - .reply_to_message_id(self.update.id) - } - - pub fn reply_photo(&self, photo: InputFile) -> R::SendPhoto { - self.requester.send_photo(self.update.chat.id, photo).reply_to_message_id(self.update.id) - } - - pub fn reply_video(&self, video: InputFile) -> R::SendVideo { - self.requester.send_video(self.update.chat.id, video).reply_to_message_id(self.update.id) - } - - pub fn reply_voice(&self, voice: InputFile) -> R::SendVoice { - self.requester.send_voice(self.update.chat.id, voice).reply_to_message_id(self.update.id) - } - - pub fn reply_media_group(&self, media_group: T) -> R::SendMediaGroup - where - T: IntoIterator, - { - self.requester - .send_media_group(self.update.chat.id, media_group) - .reply_to_message_id(self.update.id) - } - - pub fn reply_location(&self, latitude: f64, longitude: f64) -> R::SendLocation { - self.requester - .send_location(self.update.chat.id, latitude, longitude) - .reply_to_message_id(self.update.id) - } - - pub fn reply_venue( - &self, - latitude: f64, - longitude: f64, - title: T, - address: U, - ) -> R::SendVenue - where - T: Into, - U: Into, - { - self.requester - .send_venue(self.update.chat.id, latitude, longitude, title, address) - .reply_to_message_id(self.update.id) - } - - pub fn reply_video_note(&self, video_note: InputFile) -> R::SendVideoNote { - self.requester - .send_video_note(self.update.chat.id, video_note) - .reply_to_message_id(self.update.id) - } - - pub fn reply_contact(&self, phone_number: T, first_name: U) -> R::SendContact - where - T: Into, - U: Into, - { - self.requester - .send_contact(self.update.chat.id, phone_number, first_name) - .reply_to_message_id(self.update.id) - } - - pub fn reply_sticker(&self, sticker: InputFile) -> R::SendSticker { - self.requester - .send_sticker(self.update.chat.id, sticker) - .reply_to_message_id(self.update.id) - } - - pub fn answer_photo(&self, photo: InputFile) -> R::SendPhoto { - self.requester.send_photo(self.update.chat.id, photo) - } - - pub fn answer_audio(&self, audio: InputFile) -> R::SendAudio { - self.requester.send_audio(self.update.chat.id, audio) - } - - pub fn answer_animation(&self, animation: InputFile) -> R::SendAnimation { - self.requester.send_animation(self.update.chat.id, animation) - } - - pub fn answer_document(&self, document: InputFile) -> R::SendDocument { - self.requester.send_document(self.update.chat.id, document) - } - - pub fn answer_video(&self, video: InputFile) -> R::SendVideo { - self.requester.send_video(self.update.chat.id, video) - } - - pub fn answer_voice(&self, voice: InputFile) -> R::SendVoice { - self.requester.send_voice(self.update.chat.id, voice) - } - - pub fn answer_media_group(&self, media_group: T) -> R::SendMediaGroup - where - T: IntoIterator, - { - self.requester.send_media_group(self.update.chat.id, media_group) - } - - pub fn answer_location(&self, latitude: f64, longitude: f64) -> R::SendLocation { - self.requester.send_location(self.update.chat.id, latitude, longitude) - } - - pub fn answer_venue( - &self, - latitude: f64, - longitude: f64, - title: T, - address: U, - ) -> R::SendVenue - where - T: Into, - U: Into, - { - self.requester.send_venue(self.update.chat.id, latitude, longitude, title, address) - } - - pub fn answer_video_note(&self, video_note: InputFile) -> R::SendVideoNote { - self.requester.send_video_note(self.update.chat.id, video_note) - } - - pub fn answer_contact(&self, phone_number: T, first_name: U) -> R::SendContact - where - T: Into, - U: Into, - { - self.requester.send_contact(self.chat_id(), phone_number, first_name) - } - - pub fn answer_sticker(&self, sticker: InputFile) -> R::SendSticker { - self.requester.send_sticker(self.update.chat.id, sticker) - } - - pub fn forward_to(&self, chat_id: T) -> R::ForwardMessage - where - T: Into, - { - self.requester.forward_message(chat_id, self.update.chat.id, self.update.id) - } - - pub fn edit_message_text(&self, text: T) -> R::EditMessageText - where - T: Into, - { - self.requester.edit_message_text(self.update.chat.id, self.update.id, text) - } - - pub fn edit_message_caption(&self) -> R::EditMessageCaption { - self.requester.edit_message_caption(self.update.chat.id, self.update.id) - } - - pub fn delete_message(&self) -> R::DeleteMessage { - self.requester.delete_message(self.update.chat.id, self.update.id) - } - - pub fn pin_message(&self) -> R::PinChatMessage { - self.requester.pin_chat_message(self.update.chat.id, self.update.id) - } - - pub fn answer_dice(&self) -> R::SendDice { - self.requester.send_dice(self.update.chat.id) - } -} diff --git a/src/dispatching2/dialogue/get_chat_id.rs b/src/dispatching2/dialogue/get_chat_id.rs deleted file mode 100644 index eb2a1bf1..00000000 --- a/src/dispatching2/dialogue/get_chat_id.rs +++ /dev/null @@ -1,20 +0,0 @@ -use crate::types::CallbackQuery; -use teloxide_core::types::Message; - -/// Something that may has a chat ID. -pub trait GetChatId { - #[must_use] - fn chat_id(&self) -> Option; -} - -impl GetChatId for Message { - fn chat_id(&self) -> Option { - Some(self.chat.id) - } -} - -impl GetChatId for CallbackQuery { - fn chat_id(&self) -> Option { - self.message.as_ref().map(|mes| mes.chat.id) - } -} diff --git a/src/dispatching2/dialogue/mod.rs b/src/dispatching2/dialogue/mod.rs deleted file mode 100644 index ae5dc020..00000000 --- a/src/dispatching2/dialogue/mod.rs +++ /dev/null @@ -1,182 +0,0 @@ -//! Support for user dialogues. -//! -//! The main type is (surprise!) [`Dialogue`]. Under the hood, it is just a -//! wrapper over [`Storage`] and a chat ID. All it does is provides convenient -//! method for manipulating the dialogue state. [`Storage`] is where all -//! dialogue states are stored; it can be either [`InMemStorage`], which is a -//! simple hash map, or database wrappers such as [`SqliteStorage`]. In the -//! latter case, your dialogues are _persistent_, meaning that you can safely -//! restart your bot and all dialogues will remain in the database -- this is a -//! preferred method for production bots. -//! -//! [`examples/dialogue.rs`] clearly demonstrates the typical usage of -//! dialogues. Your dialogue state can be represented as an enumeration: -//! -//! ```ignore -//! #[derive(DialogueState, Clone)] -//! #[handler_out(anyhow::Result<()>)] -//! pub enum State { -//! #[handler(handle_start)] -//! Start, -//! -//! #[handler(handle_receive_full_name)] -//! ReceiveFullName, -//! -//! #[handler(handle_receive_age)] -//! ReceiveAge { full_name: String }, -//! -//! #[handler(handle_receive_location)] -//! ReceiveLocation { full_name: String, age: u8 }, -//! } -//! ``` -//! -//! Each state is associated with its respective handler: e.g., when a dialogue -//! state is `ReceiveAge`, `handle_receive_age` is invoked: -//! -//! ```ignore -//! async fn handle_receive_age( -//! bot: AutoSend, -//! msg: Message, -//! dialogue: MyDialogue, -//! (full_name,): (String,), // Available from `State::ReceiveAge`. -//! ) -> anyhow::Result<()> { -//! match msg.text().map(|text| text.parse::()) { -//! Some(Ok(age)) => { -//! bot.send_message(msg.chat.id, "What's your location?").await?; -//! dialogue.update(State::ReceiveLocation { full_name, age }).await?; -//! } -//! _ => { -//! bot.send_message(msg.chat.id, "Send me a number.").await?; -//! } -//! } -//! -//! Ok(()) -//! } -//! ``` -//! -//! Variant's fields are passed to state handlers as tuples: `(full_name,): -//! (String,)`. Using [`Dialogue::update`], you can update the dialogue with a -//! new state, in our case -- `State::ReceiveLocation { full_name, age }`. To -//! exit the dialogue, just call [`Dialogue::exit`] and it will be removed from -//! the inner storage: -//! -//! ```ignore -//! async fn handle_receive_location( -//! bot: AutoSend, -//! msg: Message, -//! dialogue: MyDialogue, -//! (full_name, age): (String, u8), // Available from `State::ReceiveLocation`. -//! ) -> anyhow::Result<()> { -//! match msg.text() { -//! Some(location) => { -//! let message = -//! format!("Full name: {}\nAge: {}\nLocation: {}", full_name, age, location); -//! bot.send_message(msg.chat.id, message).await?; -//! dialogue.exit().await?; -//! } -//! None => { -//! bot.send_message(msg.chat.id, "Send me a text message.").await?; -//! } -//! } -//! -//! Ok(()) -//! } -//! ``` -//! -//! [`examples/dialogue.rs`]: https://github.com/teloxide/teloxide/blob/master/examples/dialogue.rs - -#[cfg(feature = "redis-storage")] -pub use crate::dispatching::dialogue::{RedisStorage, RedisStorageError}; - -#[cfg(feature = "sqlite-storage")] -pub use crate::dispatching::dialogue::{SqliteStorage, SqliteStorageError}; - -pub use crate::dispatching::dialogue::{ - serializer, ErasedStorage, InMemStorage, InMemStorageError, Serializer, Storage, TraceStorage, -}; -pub use get_chat_id::GetChatId; - -use std::{marker::PhantomData, sync::Arc}; - -mod get_chat_id; - -/// A handle for controlling dialogue state. -#[derive(Debug)] -pub struct Dialogue -where - S: ?Sized, -{ - storage: Arc, - chat_id: i64, - _phantom: PhantomData, -} - -// `#[derive]` requires generics to implement `Clone`, but `S` is wrapped around -// `Arc`, and `D` is wrapped around PhantomData. -impl Clone for Dialogue -where - S: ?Sized, -{ - fn clone(&self) -> Self { - Dialogue { storage: self.storage.clone(), chat_id: self.chat_id, _phantom: PhantomData } - } -} - -impl Dialogue -where - D: Send + 'static, - S: Storage + ?Sized, -{ - /// Constructs a new dialogue with `storage` (where dialogues are stored) - /// and `chat_id` of a current dialogue. - pub fn new(storage: Arc, chat_id: i64) -> Self { - Self { storage, chat_id, _phantom: PhantomData } - } - - /// Retrieves the current state of the dialogue or `None` if there is no - /// dialogue. - pub async fn get(&self) -> Result, S::Error> { - self.storage.clone().get_dialogue(self.chat_id).await - } - - /// Like [`Dialogue::get`] but returns a default value if there is no - /// dialogue. - pub async fn get_or_default(&self) -> Result - where - D: Default, - { - match self.get().await? { - Some(d) => Ok(d), - None => { - self.storage.clone().update_dialogue(self.chat_id, D::default()).await?; - Ok(D::default()) - } - } - } - - /// Updates the dialogue state. - /// - /// The dialogue type `D` must implement `From` to allow implicit - /// conversion from `State` to `D`. - pub async fn update(&self, state: State) -> Result<(), S::Error> - where - D: From, - { - let new_dialogue = state.into(); - self.storage.clone().update_dialogue(self.chat_id, new_dialogue).await?; - Ok(()) - } - - /// Updates the dialogue with a default value. - pub async fn reset(&self) -> Result<(), S::Error> - where - D: Default, - { - self.update(D::default()).await - } - - /// Removes the dialogue from the storage provided to [`Dialogue::new`]. - pub async fn exit(&self) -> Result<(), S::Error> { - self.storage.clone().remove_dialogue(self.chat_id).await - } -} diff --git a/src/dispatching2/dispatcher.rs b/src/dispatching2/dispatcher.rs deleted file mode 100644 index b4c83d7e..00000000 --- a/src/dispatching2/dispatcher.rs +++ /dev/null @@ -1,310 +0,0 @@ -use crate::{ - adaptors::CacheMe, - dispatching::{ - stop_token::StopToken, update_listeners, update_listeners::UpdateListener, ShutdownToken, - }, - error_handlers::{ErrorHandler, LoggingErrorHandler}, - requests::Requester, - types::{AllowedUpdate, Update}, - utils::shutdown_token::shutdown_check_timeout_for, -}; -use dptree::di::{DependencyMap, DependencySupplier}; -use futures::{future::BoxFuture, StreamExt}; -use std::{collections::HashSet, fmt::Debug, ops::ControlFlow, sync::Arc}; -use teloxide_core::{ - requests::{Request, RequesterExt}, - types::UpdateKind, -}; -use tokio::time::timeout; - -use std::future::Future; - -/// The builder for [`Dispatcher`]. -pub struct DispatcherBuilder { - bot: R, - dependencies: DependencyMap, - handler: UpdateHandler, - default_handler: DefaultHandler, - error_handler: Arc + Send + Sync>, -} - -impl DispatcherBuilder -where - R: Clone + Requester + Clone + Send + Sync + 'static, - Err: Debug + Send + Sync + 'static, -{ - /// Specifies a handler that will be called for an unhandled update. - /// - /// By default, it is a mere [`log::warn`]. - #[must_use] - pub fn default_handler(self, handler: H) -> Self - where - H: Fn(Arc) -> Fut + Send + Sync + 'static, - Fut: Future + Send + 'static, - { - let handler = Arc::new(handler); - - Self { - default_handler: Box::new(move |upd| { - let handler = Arc::clone(&handler); - Box::pin(handler(upd)) - }), - ..self - } - } - - /// Specifies a handler that will be called on a handler error. - /// - /// By default, it is [`LoggingErrorHandler`]. - #[must_use] - pub fn error_handler(self, handler: Arc + Send + Sync>) -> Self { - Self { error_handler: handler, ..self } - } - - /// Specifies dependencies that can be used inside of handlers. - /// - /// By default, there is no dependencies. - #[must_use] - pub fn dependencies(self, dependencies: DependencyMap) -> Self { - Self { dependencies, ..self } - } - - /// Constructs [`Dispatcher`]. - #[must_use] - pub fn build(self) -> Dispatcher { - Dispatcher { - bot: self.bot.clone(), - cache_me_bot: self.bot.cache_me(), - dependencies: self.dependencies, - handler: self.handler, - default_handler: self.default_handler, - error_handler: self.error_handler, - allowed_updates: Default::default(), - state: ShutdownToken::new(), - } - } -} - -/// The base for update dispatching. -pub struct Dispatcher { - bot: R, - cache_me_bot: CacheMe, - dependencies: DependencyMap, - - handler: UpdateHandler, - default_handler: DefaultHandler, - error_handler: Arc + Send + Sync>, - // TODO: respect allowed_udpates - allowed_updates: HashSet, - - state: ShutdownToken, -} - -// TODO: it is allowed to return message as response on telegram request in -// webhooks, so we can allow this too. See more there: https://core.telegram.org/bots/api#making-requests-when-getting-updates - -/// A handler that processes updates from Telegram. -pub type UpdateHandler = dptree::Handler<'static, DependencyMap, Result<(), Err>>; - -type DefaultHandler = Box) -> BoxFuture<'static, ()> + Send + Sync>; - -impl Dispatcher -where - R: Requester + Clone + Send + Sync + 'static, - Err: Send + Sync + 'static, -{ - /// Constructs a new [`DispatcherBuilder`] with `bot` and `handler`. - #[must_use] - pub fn builder(bot: R, handler: UpdateHandler) -> DispatcherBuilder - where - Err: Debug, - { - DispatcherBuilder { - bot, - dependencies: DependencyMap::new(), - handler, - default_handler: Box::new(|upd| { - log::warn!("Unhandled update: {:?}", upd); - Box::pin(async {}) - }), - error_handler: LoggingErrorHandler::new(), - } - } - - /// Starts your bot with the default parameters. - /// - /// The default parameters are a long polling update listener and log all - /// errors produced by this listener. - /// - /// Each time a handler is invoked, [`Dispatcher`] adds the following - /// dependencies (in addition to those passed to - /// [`DispatcherBuilder::dependencies`]): - /// - /// - Your bot passed to [`Dispatcher::builder`]; - /// - An update from Telegram; - /// - [`crate::types::Me`] (can be used in [`HandlerExt::filter_command`]). - /// - /// [`shutdown`]: ShutdownToken::shutdown - /// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler - /// [`HandlerExt::filter_command`]: crate::dispatching2::HandlerExt::filter_command - pub async fn dispatch(&mut self) - where - R: Requester + Clone, - ::GetUpdates: Send, - { - let listener = update_listeners::polling_default(self.bot.clone()).await; - let error_handler = - LoggingErrorHandler::with_custom_text("An error from the update listener"); - - self.dispatch_with_listener(listener, error_handler).await; - } - - /// Starts your bot with custom `update_listener` and - /// `update_listener_error_handler`. - /// - /// This method adds the same dependencies as [`Dispatcher::dispatch`]. - /// - /// [`shutdown`]: ShutdownToken::shutdown - /// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler - pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>( - &'a mut self, - mut update_listener: UListener, - update_listener_error_handler: Arc, - ) where - UListener: UpdateListener + 'a, - Eh: ErrorHandler + 'a, - ListenerE: Debug, - { - update_listener.hint_allowed_updates(&mut self.allowed_updates.clone().into_iter()); - - let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener); - let mut stop_token = Some(update_listener.stop_token()); - - self.state.start_dispatching(); - - { - let stream = update_listener.as_stream(); - tokio::pin!(stream); - - loop { - // False positive - #[allow(clippy::collapsible_match)] - if let Ok(upd) = timeout(shutdown_check_timeout, stream.next()).await { - match upd { - None => break, - Some(upd) => self.process_update(upd, &update_listener_error_handler).await, - } - } - - if self.state.is_shutting_down() { - if let Some(token) = stop_token.take() { - log::debug!("Start shutting down dispatching..."); - token.stop(); - break; - } - } - } - } - - // TODO: wait for executing handlers? - - self.state.done(); - } - - async fn process_update( - &self, - update: Result, - err_handler: &Arc, - ) where - LErrHandler: ErrorHandler, - { - match update { - Ok(upd) => { - if let UpdateKind::Error(err) = upd.kind { - log::error!( - "Cannot parse an update.\nError: {:?}\n\ - This is a bug in teloxide-core, please open an issue here: \ - https://github.com/teloxide/teloxide/issues.", - err, - ); - return; - } - - let mut deps = self.dependencies.clone(); - deps.insert(upd); - deps.insert(self.bot.clone()); - deps.insert( - self.cache_me_bot.get_me().send().await.expect("Failed to retrieve 'me'"), - ); - - match self.handler.dispatch(deps).await { - ControlFlow::Break(Ok(())) => {} - ControlFlow::Break(Err(err)) => { - self.error_handler.clone().handle_error(err).await - } - ControlFlow::Continue(deps) => { - let upd = deps.get(); - (self.default_handler)(upd).await; - } - } - } - Err(err) => err_handler.clone().handle_error(err).await, - } - } - - /// Setups the `^C` handler that [`shutdown`]s dispatching. - /// - /// [`shutdown`]: ShutdownToken::shutdown - #[cfg(feature = "ctrlc_handler")] - pub fn setup_ctrlc_handler(&mut self) -> &mut Self { - let token = self.state.clone(); - tokio::spawn(async move { - loop { - tokio::signal::ctrl_c().await.expect("Failed to listen for ^C"); - - match token.shutdown() { - Ok(f) => { - log::info!("^C received, trying to shutdown the dispatcher..."); - f.await; - log::info!("dispatcher is shutdown..."); - } - Err(_) => { - log::info!("^C received, the dispatcher isn't running, ignoring the signal") - } - } - } - }); - - self - } - - /// Returns a shutdown token, which can later be used to shutdown - /// dispatching. - pub fn shutdown_token(&self) -> ShutdownToken { - self.state.clone() - } -} - -#[cfg(test)] -mod tests { - use std::convert::Infallible; - - use teloxide_core::Bot; - - use super::*; - - #[tokio::test] - async fn test_tokio_spawn() { - tokio::spawn(async { - // Just check that this code compiles. - if false { - Dispatcher::<_, Infallible>::builder(Bot::new(""), dptree::entry()) - .build() - .dispatch() - .await; - } - }) - .await - .unwrap(); - } -} diff --git a/src/dispatching2/mod.rs b/src/dispatching2/mod.rs deleted file mode 100644 index dca6166b..00000000 --- a/src/dispatching2/mod.rs +++ /dev/null @@ -1,110 +0,0 @@ -//! A new dispatching model based on [`dptree`]. -//! -//! In teloxide, updates are dispatched by a pipleine. The central type is -//! [`dptree::Handler`] -- it represents a handler of an update; since the API -//! is highly declarative, you can combine handlers with each other via such -//! methods as [`dptree::Handler::chain`] and [`dptree::Handler::branch`]. The -//! former method pipes one handler to another one, whilst the latter creates a -//! new node, as communicated by the name. For more information, please refer to -//! the documentation of [`dptree`]. -//! -//! The pattern itself is called [chain of responsibility], a well-known design -//! technique across OOP developers. But unlike typical object-oriented design, -//! we employ declarative FP-style functions like [`dptree::filter`], -//! [`dptree::filter_map`], and [`dptree::endpoint`]; these functions create -//! special forms of [`dptree::Handler`]; for more information, please refer to -//! their respective documentation. Each of these higher-order functions accept -//! a closure that is made into a handler -- this closure can take any -//! additional parameters, which must be supplied while creating [`Dispatcher`] -//! (see [`DispatcherBuilder::dependencies`]). -//! -//! The [`Dispatcher`] type puts all these things together: it only provides -//! [`Dispatcher::dispatch`] and a handful of other methods. Once you call -//! `.dispatch()`, it will retrieve updates from the Telegram server and pass -//! them to your handler, which is a parameter of [`Dispatcher::builder`]. -//! -//! Let us look at a simple example: -//! -//! -//! ([Full](https://github.com/teloxide/teloxide/blob/master/examples/shared_state.rs)) -//! -//! ```no_run -//! use std::sync::atomic::{AtomicU64, Ordering}; -//! -//! use once_cell::sync::Lazy; -//! use teloxide::prelude2::*; -//! -//! static MESSAGES_TOTAL: Lazy = Lazy::new(AtomicU64::default); -//! -//! # #[tokio::main] -//! # async fn main() { -//! pretty_env_logger::init(); -//! log::info!("Starting shared_state_bot..."); -//! -//! let bot = Bot::from_env().auto_send(); -//! -//! let handler = Update::filter_message().branch(dptree::endpoint( -//! |msg: Message, bot: AutoSend| async move { -//! let previous = MESSAGES_TOTAL.fetch_add(1, Ordering::Relaxed); -//! bot.send_message(msg.chat.id, format!("I received {} messages in total.", previous)) -//! .await?; -//! respond(()) -//! }, -//! )); -//! -//! Dispatcher::builder(bot, handler).build().setup_ctrlc_handler().dispatch().await; -//! # } -//! ``` -//! -//! 1. First, we create the bot: `let bot = Bot::from_env().auto_send()`. -//! 2. Then we construct an update handler. While it is possible to handle all -//! kinds of [`crate::types::Update`], here we are only interested in -//! [`crate::types::Message`]: [`UpdateFilterExt::filter_message`] create a -//! handler object which filters all messages out of a generic update. -//! 3. By doing `.branch(dptree::endpoint(...))`, we set up a custom handling -//! closure that receives `msg: Message` and `bot: AutoSend`. There are -//! called dependencies: `msg` is supplied by -//! [`UpdateFilterExt::filter_message`], while `bot` is supplied by -//! [`Dispatcher`]. -//! -//! That being said, if we receive a message, the dispatcher will call our -//! handler, but if we receive something other than a message (e.g., a channel -//! post), you will see an unhandled update notice in your terminal. -//! -//! This is a very limited example of update pipelining facilities. In more -//! involved scenarios, there are multiple branches and chains; if one element -//! of a chain fails to handle an update, the update will be passed forwards; if -//! no handler succeeds at handling the update, [`Dispatcher`] will invoke a -//! default handler set up via [`DispatcherBuilder::default_handler`]. -//! -//! Update pipelining provides several advantages over the typical `match -//! (update.kind) { ... }` approach: -//! -//! 1. It supports _extension_: e.g., you -//! can define extension filters or some other handlers and then combine them in -//! a single place, thus facilitating loose coupling. -//! 2. Pipelining exhibits a natural syntax for expressing message processing. -//! 3. Lastly, it provides a primitive form of [dependency injection (DI)], -//! which allows you to deal with such objects as a bot and various update types -//! easily. -//! -//! For a more involved example, see [`examples/dispatching2_features.rs`](https://github.com/teloxide/teloxide/blob/master/examples/dispatching2_features.rs). -//! -//! TODO: explain a more involved example with multiple branches. -//! -//! [chain of responsibility]: https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern -//! [dependency injection (DI)]: https://en.wikipedia.org/wiki/Dependency_injection - -#[cfg(all(feature = "dispatching2", feature = "ctrlc_handler"))] -pub mod repls; - -pub mod dialogue; -mod dispatcher; -mod filter_ext; -mod handler_ext; -mod handler_factory; - -pub use dispatcher::{Dispatcher, DispatcherBuilder, UpdateHandler}; -pub use filter_ext::{MessageFilterExt, UpdateFilterExt}; -pub use handler_ext::HandlerExt; -pub use handler_factory::HandlerFactory; diff --git a/src/dispatching2/repls/commands_repl.rs b/src/dispatching2/repls/commands_repl.rs deleted file mode 100644 index 488d8313..00000000 --- a/src/dispatching2/repls/commands_repl.rs +++ /dev/null @@ -1,102 +0,0 @@ -use crate::{ - dispatching::{update_listeners, update_listeners::UpdateListener}, - dispatching2::{HandlerExt, UpdateFilterExt}, - error_handlers::LoggingErrorHandler, - types::Update, - utils::command::BotCommand, -}; -use dptree::di::{DependencyMap, Injectable}; -use std::{fmt::Debug, marker::PhantomData}; -use teloxide_core::requests::Requester; - -/// A [REPL] for commands. -/// -/// All errors from an update listener and handler will be logged. -/// -/// ## Caution -/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, -/// because Telegram disallow multiple requests at the same time from the same -/// bot. -/// -/// ## Dependency requirements -/// -/// - Those of [`HandlerExt::filter_command`]. -/// -/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop -/// [`Dispatcher`]: crate::dispatching::Dispatcher -#[cfg(feature = "ctrlc_handler")] -pub async fn commands_repl<'a, R, Cmd, H, E, Args>(bot: R, handler: H, cmd: PhantomData) -where - Cmd: BotCommand + Send + Sync + 'static, - H: Injectable, Args> + Send + Sync + 'static, - R: Requester + Clone + Send + Sync + 'static, - ::GetUpdates: Send, - E: Debug + Send + Sync + 'static, -{ - let cloned_bot = bot.clone(); - - commands_repl_with_listener( - bot, - handler, - update_listeners::polling_default(cloned_bot).await, - cmd, - ) - .await; -} - -/// Like [`commands_repl`], but with a custom [`UpdateListener`]. -/// -/// All errors from an update listener and handler will be logged. -/// -/// ## Caution -/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, -/// because Telegram disallow multiple requests at the same time from the same -/// bot. -/// -/// ## Dependency requirements -/// -/// - Those of [`HandlerExt::filter_command`]. -/// -/// [`Dispatcher`]: crate::dispatching::Dispatcher -/// [`commands_repl`]: crate::dispatching::repls::commands_repl() -/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener -#[cfg(feature = "ctrlc_handler")] -pub async fn commands_repl_with_listener<'a, R, Cmd, H, L, ListenerE, E, Args>( - bot: R, - handler: H, - listener: L, - _cmd: PhantomData, -) where - Cmd: BotCommand + Send + Sync + 'static, - H: Injectable, Args> + Send + Sync + 'static, - L: UpdateListener + Send + 'a, - ListenerE: Debug + Send + 'a, - R: Requester + Clone + Send + Sync + 'static, - E: Debug + Send + Sync + 'static, -{ - use crate::dispatching2::Dispatcher; - - // Other update types are of no interest to use since this REPL is only for - // commands. See . - let ignore_update = |_upd| Box::pin(async {}); - - let mut dispatcher = Dispatcher::builder( - bot, - Update::filter_message().filter_command::().branch(dptree::endpoint(handler)), - ) - .default_handler(ignore_update) - .build(); - - #[cfg(feature = "ctrlc_handler")] - dispatcher.setup_ctrlc_handler(); - - // To make mutable var from immutable. - let mut dispatcher = dispatcher; - - dispatcher - .dispatch_with_listener( - listener, - LoggingErrorHandler::with_custom_text("An error from the update listener"), - ) - .await; -} diff --git a/src/dispatching2/repls/dialogues_repl.rs b/src/dispatching2/repls/dialogues_repl.rs deleted file mode 100644 index 755d4dd3..00000000 --- a/src/dispatching2/repls/dialogues_repl.rs +++ /dev/null @@ -1,96 +0,0 @@ -use crate::{ - dispatching::{ - dialogue::{DialogueDispatcher, DialogueStage, DialogueWithCx, InMemStorageError}, - update_listeners, - update_listeners::UpdateListener, - Dispatcher, UpdateWithCx, - }, - error_handlers::LoggingErrorHandler, -}; -use std::{fmt::Debug, future::Future, sync::Arc}; -use teloxide_core::{requests::Requester, types::Message}; - -/// A [REPL] for dialogues. -/// -/// All errors from an update listener and handler will be logged. This function -/// uses [`InMemStorage`]. -/// -/// # Caution -/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, -/// because Telegram disallow multiple requests at the same time from the same -/// bot. -/// -/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop -/// [`Dispatcher`]: crate::dispatching::Dispatcher -/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage -#[cfg(feature = "ctrlc_handler")] -pub async fn dialogues_repl<'a, R, H, D, Fut>(requester: R, handler: H) -where - H: Fn(UpdateWithCx, D) -> Fut + Send + Sync + 'static, - D: Clone + Default + Send + 'static, - Fut: Future> + Send + 'static, - R: Requester + Send + Clone + 'static, - ::GetUpdatesFaultTolerant: Send, -{ - let cloned_requester = requester.clone(); - - dialogues_repl_with_listener( - requester, - handler, - update_listeners::polling_default(cloned_requester).await, - ) - .await; -} - -/// Like [`dialogues_repl`], but with a custom [`UpdateListener`]. -/// -/// All errors from an update listener and handler will be logged. This function -/// uses [`InMemStorage`]. -/// -/// # Caution -/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, -/// because Telegram disallow multiple requests at the same time from the same -/// bot. -/// -/// [`Dispatcher`]: crate::dispatching::Dispatcher -/// [`dialogues_repl`]: crate::dispatching::repls::dialogues_repl() -/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener -/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage -#[cfg(feature = "ctrlc_handler")] -pub async fn dialogues_repl_with_listener<'a, R, H, D, Fut, L, ListenerE>( - requester: R, - handler: H, - listener: L, -) where - H: Fn(UpdateWithCx, D) -> Fut + Send + Sync + 'static, - D: Clone + Default + Send + 'static, - Fut: Future> + Send + 'static, - L: UpdateListener + Send + 'a, - ListenerE: Debug + Send + 'a, - R: Requester + Send + Clone + 'static, -{ - let handler = Arc::new(handler); - - Dispatcher::new(requester) - .messages_handler(DialogueDispatcher::new( - move |DialogueWithCx { cx, dialogue }: DialogueWithCx< - R, - Message, - D, - InMemStorageError, - >| { - let handler = Arc::clone(&handler); - - async move { - let dialogue = dialogue.expect("std::convert::Infallible"); - handler(cx, dialogue).await - } - }, - )) - .setup_ctrlc_handler() - .dispatch_with_listener( - listener, - LoggingErrorHandler::with_custom_text("An error from the update listener"), - ) - .await; -} diff --git a/src/dispatching2/repls/mod.rs b/src/dispatching2/repls/mod.rs deleted file mode 100644 index ab1e9f5f..00000000 --- a/src/dispatching2/repls/mod.rs +++ /dev/null @@ -1,8 +0,0 @@ -//! REPLs for dispatching updates. - -mod commands_repl; -mod repl; - -pub use commands_repl::{commands_repl, commands_repl_with_listener}; -//pub use dialogues_repl::{dialogues_repl, dialogues_repl_with_listener}; -pub use repl::{repl, repl_with_listener}; diff --git a/src/dispatching2/repls/repl.rs b/src/dispatching2/repls/repl.rs deleted file mode 100644 index 497e4f3a..00000000 --- a/src/dispatching2/repls/repl.rs +++ /dev/null @@ -1,78 +0,0 @@ -use crate::{ - dispatching::{update_listeners, update_listeners::UpdateListener}, - dispatching2::UpdateFilterExt, - error_handlers::{LoggingErrorHandler, OnError}, - types::Update, -}; -use dptree::di::{DependencyMap, Injectable}; -use std::fmt::Debug; -use teloxide_core::requests::Requester; - -/// A [REPL] for messages. -/// -/// All errors from an update listener and a handler will be logged. -/// -/// # Caution -/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, -/// because Telegram disallow multiple requests at the same time from the same -/// bot. -/// -/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop -/// [`Dispatcher`]: crate::dispatching::Dispatcher -#[cfg(feature = "ctrlc_handler")] -pub async fn repl(bot: R, handler: H) -where - H: Injectable, Args> + Send + Sync + 'static, - Result<(), E>: OnError, - E: Debug + Send + Sync + 'static, - R: Requester + Send + Sync + Clone + 'static, - ::GetUpdates: Send, -{ - let cloned_bot = bot.clone(); - repl_with_listener(bot, handler, update_listeners::polling_default(cloned_bot).await).await; -} - -/// Like [`repl`], but with a custom [`UpdateListener`]. -/// -/// All errors from an update listener and handler will be logged. -/// -/// # Caution -/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, -/// because Telegram disallow multiple requests at the same time from the same -/// bot. -/// -/// [`Dispatcher`]: crate::dispatching::Dispatcher -/// [`repl`]: crate::dispatching::repls::repl() -/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener -#[cfg(feature = "ctrlc_handler")] -pub async fn repl_with_listener<'a, R, H, E, L, ListenerE, Args>(bot: R, handler: H, listener: L) -where - H: Injectable, Args> + Send + Sync + 'static, - L: UpdateListener + Send + 'a, - ListenerE: Debug, - Result<(), E>: OnError, - E: Debug + Send + Sync + 'static, - R: Requester + Clone + Send + Sync + 'static, -{ - use crate::dispatching2::Dispatcher; - - // Other update types are of no interest to use since this REPL is only for - // messages. See . - let ignore_update = |_upd| Box::pin(async {}); - - #[allow(unused_mut)] - let mut dispatcher = - Dispatcher::builder(bot, Update::filter_message().branch(dptree::endpoint(handler))) - .default_handler(ignore_update) - .build(); - - #[cfg(feature = "ctrlc_handler")] - dispatcher.setup_ctrlc_handler(); - - dispatcher - .dispatch_with_listener( - listener, - LoggingErrorHandler::with_custom_text("An error from the update listener"), - ) - .await; -} diff --git a/src/lib.rs b/src/lib.rs index d57f23dd..2ab0e677 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,7 @@ //! //! ([Full](https://github.com/teloxide/teloxide/blob/master/examples/dices.rs)) //! ```no_run -//! use teloxide::prelude2::*; +//! use teloxide::prelude::*; //! //! # #[tokio::main] //! # async fn main() { @@ -15,7 +15,7 @@ //! //! let bot = Bot::from_env().auto_send(); //! -//! teloxide::repls2::repl(bot, |message: Message, bot: AutoSend| async move { +//! teloxide::repl(bot, |message: Message, bot: AutoSend| async move { //! bot.send_dice(message.chat.id).await?; //! respond(()) //! }) @@ -61,25 +61,16 @@ // https://github.com/rust-lang/rust-clippy/issues/7422 #![allow(clippy::nonstandard_macro_braces)] -#[cfg(feature = "ctrlc_handler")] +#[cfg(all(feature = "ctrlc_handler"))] pub use dispatching::repls::{ - commands_repl, commands_repl_with_listener, dialogues_repl, dialogues_repl_with_listener, repl, - repl_with_listener, + commands_repl, commands_repl_with_listener, repl, repl_with_listener, }; -#[cfg(all(feature = "dispatching2", feature = "ctrlc_handler"))] -pub use dispatching2::repls as repls2; - mod logging; -// Things from this module is also used for the dispatching2 module. pub mod dispatching; -#[cfg(feature = "dispatching2")] -pub mod dispatching2; pub mod error_handlers; pub mod prelude; -#[cfg(feature = "dispatching2")] -pub mod prelude2; pub mod utils; #[doc(inline)] @@ -88,10 +79,7 @@ pub use teloxide_core::*; #[cfg(feature = "macros")] pub use teloxide_macros as macros; -#[cfg(feature = "dispatching2")] pub use dptree; -#[cfg(feature = "macros")] -pub use teloxide_macros::teloxide; #[cfg(all(feature = "nightly", doctest))] #[cfg_attr(feature = "nightly", cfg_attr(feature = "nightly", doc = include_str!("../README.md")))] diff --git a/src/prelude.rs b/src/prelude.rs index b8a9d05b..3cbf6ca5 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,27 +1,17 @@ //! Commonly used items. -#![deprecated(note = "Use dispatching2 instead")] -#![allow(deprecated)] - pub use crate::{ error_handlers::{LoggingErrorHandler, OnError}, respond, }; pub use crate::dispatching::{ - dialogue::{ - exit, next, DialogueDispatcher, DialogueStage, DialogueWithCx, GetChatId, Transition, - TransitionIn, TransitionOut, - }, - Dispatcher, DispatcherHandlerRx, DispatcherHandlerRxExt, UpdateWithCx, + dialogue::Dialogue, Dispatcher, HandlerExt as _, MessageFilterExt as _, UpdateFilterExt as _, }; -#[cfg(feature = "macros")] -pub use crate::teloxide; - pub use teloxide_core::types::{ CallbackQuery, ChatMemberUpdated, ChosenInlineResult, InlineQuery, Message, Poll, PollAnswer, - PreCheckoutQuery, ShippingQuery, + PreCheckoutQuery, ShippingQuery, Update, }; #[cfg(feature = "auto-send")] @@ -30,9 +20,4 @@ pub use crate::adaptors::AutoSend; #[doc(no_inline)] pub use teloxide_core::prelude::*; -#[cfg(feature = "frunk")] -pub use crate::utils::UpState; - -pub use tokio::sync::mpsc::UnboundedReceiver; - -pub use futures::StreamExt; +pub use dptree::{self, prelude::*}; diff --git a/src/prelude2.rs b/src/prelude2.rs deleted file mode 100644 index 5f703d3b..00000000 --- a/src/prelude2.rs +++ /dev/null @@ -1,26 +0,0 @@ -//! Commonly used items (`dispatching2`). - -pub use crate::{ - error_handlers::{LoggingErrorHandler, OnError}, - respond, -}; - -pub use crate::dispatching2::{ - dialogue::Dialogue, Dispatcher, HandlerExt as _, MessageFilterExt as _, UpdateFilterExt as _, -}; - -#[cfg(feature = "macros")] -pub use crate::teloxide; - -pub use teloxide_core::types::{ - CallbackQuery, ChatMemberUpdated, ChosenInlineResult, InlineQuery, Message, Poll, PollAnswer, - PreCheckoutQuery, ShippingQuery, Update, -}; - -#[cfg(feature = "auto-send")] -pub use crate::adaptors::AutoSend; - -#[doc(no_inline)] -pub use teloxide_core::prelude::*; - -pub use dptree::{self, prelude::*}; diff --git a/src/utils/mod.rs b/src/utils/mod.rs index f4aafd4d..d83e0dbb 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -4,7 +4,6 @@ pub mod command; pub mod html; pub mod markdown; pub(crate) mod shutdown_token; -mod up_state; pub use teloxide_core::net::client_from_env; diff --git a/src/utils/shutdown_token.rs b/src/utils/shutdown_token.rs index 3c11c812..3b9a898f 100644 --- a/src/utils/shutdown_token.rs +++ b/src/utils/shutdown_token.rs @@ -12,7 +12,7 @@ use tokio::sync::Notify; use crate::dispatching::update_listeners::UpdateListener; -/// A token which used to shutdown [`Dispatcher`]. +/// A token which used to shutdown [`crate::dispatching::Dispatcher`]. #[derive(Clone)] pub struct ShutdownToken { dispatcher_state: Arc, @@ -20,7 +20,7 @@ pub struct ShutdownToken { } /// This error is returned from [`ShutdownToken::shutdown`] when trying to -/// shutdown an idle [`Dispatcher`]. +/// shutdown an idle [`crate::dispatching::Dispatcher`]. #[derive(Debug)] pub struct IdleShutdownError; diff --git a/src/utils/up_state.rs b/src/utils/up_state.rs deleted file mode 100644 index 6f6385c6..00000000 --- a/src/utils/up_state.rs +++ /dev/null @@ -1,25 +0,0 @@ -#![cfg(feature = "frunk")] - -use frunk::{from_generic, generic::Generic, hlist::h_cons, into_generic, HCons, HNil}; -use std::ops::Add; - -/// Constructs a structure from another structure and a field. -/// -/// Let `X` be a structure of `field1, ..., fieldN`, `Y` be `field1, ..., -/// fieldN, fieldN+1`. Both `X` and `Y` implement [`Generic`]. Then `Y::up(x, -/// fieldN+1)` constructs `Y` from all the fields of `x: X` plus `Y`'s -/// `fieldN+1`. -/// -/// [`Generic`]: https://docs.rs/frunk/latest/frunk/generic/trait.Generic.html -pub trait UpState: Sized { - fn up(src: Src, field: F) -> Self - where - Src: Generic, - Self: Generic::Repr as Add>>::Output>, - ::Repr: Add>, - { - from_generic(into_generic(src) + h_cons(field, HNil)) - } -} - -impl UpState for Dst {}