Remove support for the old dispatching

This commit is contained in:
Hirrolot 2022-03-24 17:25:42 +06:00
parent c35f4f42ce
commit f5653c747d
47 changed files with 631 additions and 2608 deletions

View file

@ -6,6 +6,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## unreleased ## unreleased
### Removed
- The old dispatching system and related stuff: `dispatching`, `utils::UpState`, `prelude`, `repls2`, `crate::{dialogues_repl, dialogues_repl_with_listener}`, and `#[teloxide(...)]`.
### Changed
- Rename `dispatching2` => `dispatching`.
- Rename `prelude2` => `prelude`.
- Move `update_listeners`, `stop_token`, `IdleShutdownError`, and `ShutdownToken` from the old `dispatching` to the new `dispatching`.
- Replace `crate::{commands_repl, commands_repl_with_listener, repl, repl_with_listener}` with those of the new `dispatching`.
## 0.7.2 - 2022-03-23 ## 0.7.2 - 2022-03-23
### Added ### Added

View file

@ -13,16 +13,13 @@ exclude = ["media"]
[features] [features]
# FIXME: remove "cache-me" that was added by mistake here # FIXME: remove "cache-me" that was added by mistake here
default = ["native-tls", "ctrlc_handler", "teloxide-core/default", "auto-send", "dispatching2", "cache-me"] default = ["native-tls", "ctrlc_handler", "teloxide-core/default", "auto-send", "cache-me"]
dispatching2 = ["dptree", "cache-me"]
sqlite-storage = ["sqlx"] sqlite-storage = ["sqlx"]
redis-storage = ["redis"] redis-storage = ["redis"]
cbor-serializer = ["serde_cbor"] cbor-serializer = ["serde_cbor"]
bincode-serializer = ["bincode"] bincode-serializer = ["bincode"]
frunk- = ["frunk"]
macros = ["teloxide-macros"] macros = ["teloxide-macros"]
ctrlc_handler = ["tokio/signal"] ctrlc_handler = ["tokio/signal"]
@ -44,7 +41,6 @@ full = [
"redis-storage", "redis-storage",
"cbor-serializer", "cbor-serializer",
"bincode-serializer", "bincode-serializer",
"frunk",
"macros", "macros",
"ctrlc_handler", "ctrlc_handler",
"teloxide-core/full", "teloxide-core/full",
@ -59,28 +55,26 @@ full = [
[dependencies] [dependencies]
teloxide-core = { version = "0.4", default-features = false } teloxide-core = { version = "0.4", default-features = false }
teloxide-macros = { version = "0.5.1", optional = true } teloxide-macros = { git = "https://github.com/teloxide/teloxide-macros.git", rev = "144eb73aaf39145bf8f6b57eec5c76730961c2f1", optional = true }
serde_json = "1.0" serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
dptree = { version = "0.1.0", optional = true } dptree = { version = "0.1.0" }
tokio = { version = "1.8", features = ["fs"] } tokio = { version = "1.8", features = ["fs"] }
tokio-util = "0.6" tokio-util = "0.6"
tokio-stream = "0.1"
flurry = "0.3"
log = "0.4" log = "0.4"
bytes = "1.0" bytes = "1.0"
mime = "0.3" mime = "0.3"
derive_more = "0.99" derive_more = "0.99"
thiserror = "1.0" thiserror = "1.0"
async-trait = "0.1"
futures = "0.3.15" futures = "0.3.15"
pin-project = "1.0" pin-project = "1.0"
serde_with_macros = "1.4" serde_with_macros = "1.4"
aquamarine = "0.1.11"
sqlx = { version = "0.5", optional = true, default-features = false, features = [ sqlx = { version = "0.5", optional = true, default-features = false, features = [
"runtime-tokio-native-tls", "runtime-tokio-native-tls",
@ -90,15 +84,11 @@ sqlx = { version = "0.5", optional = true, default-features = false, features =
redis = { version = "0.20", features = ["tokio-comp"], optional = true } redis = { version = "0.20", features = ["tokio-comp"], optional = true }
serde_cbor = { version = "0.11", optional = true } serde_cbor = { version = "0.11", optional = true }
bincode = { version = "1.3", optional = true } bincode = { version = "1.3", optional = true }
frunk = { version = "0.4", optional = true }
aquamarine = "0.1.11"
[dev-dependencies] [dev-dependencies]
smart-default = "0.6.0"
rand = "0.8.3" rand = "0.8.3"
pretty_env_logger = "0.4.0" pretty_env_logger = "0.4.0"
once_cell = "1.9.0" once_cell = "1.9.0"
lazy_static = "1.4.0"
anyhow = "1.0.52" anyhow = "1.0.52"
serde = "1" serde = "1"
serde_json = "1" serde_json = "1"
@ -106,6 +96,7 @@ tokio = { version = "1.8", features = ["fs", "rt-multi-thread", "macros"] }
warp = "0.3.0" warp = "0.3.0"
reqwest = "0.10.4" reqwest = "0.10.4"
chrono = "0.4" chrono = "0.4"
tokio-stream = "0.1"
[package.metadata.docs.rs] [package.metadata.docs.rs]
all-features = true all-features = true
@ -149,5 +140,5 @@ name = "admin"
required-features = ["macros"] required-features = ["macros"]
[[example]] [[example]]
name = "dispatching2_features" name = "dispatching_features"
required-features = ["macros"] required-features = ["macros"]

View file

@ -88,7 +88,7 @@ This bot replies with a dice throw to each received message:
([Full](examples/dices.rs)) ([Full](examples/dices.rs))
```rust,no_run ```rust,no_run
use teloxide::prelude2::*; use teloxide::prelude::*;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
@ -97,7 +97,7 @@ async fn main() {
let bot = Bot::from_env().auto_send(); let bot = Bot::from_env().auto_send();
teloxide::repls2::repl(bot, |message: Message, bot: AutoSend<Bot>| async move { teloxide::repl(bot, |message: Message, bot: AutoSend<Bot>| async move {
bot.send_dice(message.chat.id).await?; bot.send_dice(message.chat.id).await?;
respond(()) respond(())
}) })
@ -125,10 +125,20 @@ Commands are strongly typed and defined declaratively, similar to how we define
([Full](examples/simple_commands.rs)) ([Full](examples/simple_commands.rs))
```rust,no_run ```rust,no_run
use teloxide::{prelude2::*, utils::command::BotCommand}; use teloxide::{prelude::*, utils::command::BotCommand};
use std::error::Error; use std::error::Error;
#[tokio::main]
async fn main() {
pretty_env_logger::init();
log::info!("Starting simple_commands_bot...");
let bot = Bot::from_env().auto_send();
teloxide::commands_repl(bot, answer, Command::ty()).await;
}
#[derive(BotCommand, Clone)] #[derive(BotCommand, Clone)]
#[command(rename = "lowercase", description = "These commands are supported:")] #[command(rename = "lowercase", description = "These commands are supported:")]
enum Command { enum Command {
@ -161,16 +171,6 @@ async fn answer(
Ok(()) Ok(())
} }
#[tokio::main]
async fn main() {
pretty_env_logger::init();
log::info!("Starting simple_commands_bot...");
let bot = Bot::from_env().auto_send();
teloxide::repls2::commands_repl(bot, answer, Command::ty()).await;
}
``` ```
<div align="center"> <div align="center">
@ -190,7 +190,7 @@ Below is a bot that asks you three questions and then sends the answers back to
([Full](examples/dialogue.rs)) ([Full](examples/dialogue.rs))
```rust,ignore ```rust,ignore
use teloxide::{dispatching2::dialogue::InMemStorage, macros::DialogueState, prelude2::*}; use teloxide::{dispatching::dialogue::InMemStorage, macros::DialogueState, prelude::*};
type MyDialogue = Dialogue<State, InMemStorage<State>>; type MyDialogue = Dialogue<State, InMemStorage<State>>;

View file

@ -1,7 +1,7 @@
use std::{error::Error, str::FromStr}; use std::{error::Error, str::FromStr};
use chrono::Duration; use chrono::Duration;
use teloxide::{prelude2::*, types::ChatPermissions, utils::command::BotCommand}; use teloxide::{prelude::*, types::ChatPermissions, utils::command::BotCommand};
// Derive BotCommand to parse text with a command into this enumeration. // Derive BotCommand to parse text with a command into this enumeration.
// //
@ -149,5 +149,5 @@ async fn main() {
let bot = teloxide::Bot::from_env().auto_send(); let bot = teloxide::Bot::from_env().auto_send();
teloxide::repls2::commands_repl(bot, action, Command::ty()).await; teloxide::commands_repl(bot, action, Command::ty()).await;
} }

View file

@ -1,7 +1,7 @@
use std::error::Error; use std::error::Error;
use teloxide::{ use teloxide::{
payloads::SendMessageSetters, payloads::SendMessageSetters,
prelude2::*, prelude::*,
types::{ types::{
InlineKeyboardButton, InlineKeyboardMarkup, InlineQueryResultArticle, InputMessageContent, InlineKeyboardButton, InlineKeyboardMarkup, InlineQueryResultArticle, InputMessageContent,
InputMessageContentText, InputMessageContentText,

View file

@ -2,12 +2,12 @@
// Otherwise, the default is Sqlite. // Otherwise, the default is Sqlite.
use teloxide::{ use teloxide::{
dispatching2::dialogue::{ dispatching::dialogue::{
serializer::{Bincode, Json}, serializer::{Bincode, Json},
ErasedStorage, RedisStorage, SqliteStorage, Storage, ErasedStorage, RedisStorage, SqliteStorage, Storage,
}, },
macros::DialogueState, macros::DialogueState,
prelude2::*, prelude::*,
types::Me, types::Me,
utils::command::BotCommand, utils::command::BotCommand,
}; };

View file

@ -13,7 +13,7 @@
// Age: 223 // Age: 223
// Location: Middle-earth // Location: Middle-earth
// ``` // ```
use teloxide::{dispatching2::dialogue::InMemStorage, macros::DialogueState, prelude2::*}; use teloxide::{dispatching::dialogue::InMemStorage, macros::DialogueState, prelude::*};
type MyDialogue = Dialogue<State, InMemStorage<State>>; type MyDialogue = Dialogue<State, InMemStorage<State>>;

View file

@ -1,6 +1,6 @@
// This bot throws a dice on each incoming message. // This bot throws a dice on each incoming message.
use teloxide::prelude2::*; use teloxide::prelude::*;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
@ -9,7 +9,7 @@ async fn main() {
let bot = Bot::from_env().auto_send(); let bot = Bot::from_env().auto_send();
teloxide::repls2::repl(bot, |message: Message, bot: AutoSend<Bot>| async move { teloxide::repl(bot, |message: Message, bot: AutoSend<Bot>| async move {
bot.send_dice(message.chat.id).await?; bot.send_dice(message.chat.id).await?;
respond(()) respond(())
}) })

View file

@ -1,12 +1,10 @@
// This example provide a quick overview of the new features in the // This example provide a quick overview of the new features in the
// `dispatching2` module. // `dispatching` module.
use rand::Rng; use rand::Rng;
// You need to import `prelude2` because `prelude` contains items from the old
// dispatching system, which will be deprecated in the future.
use teloxide::{ use teloxide::{
prelude2::*, prelude::*,
types::{Dice, Update}, types::{Dice, Update},
utils::command::BotCommand, utils::command::BotCommand,
}; };
@ -14,7 +12,7 @@ use teloxide::{
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
pretty_env_logger::init(); pretty_env_logger::init();
log::info!("Starting dispatching2_features_bot..."); log::info!("Starting dispatching_features_bot...");
let bot = Bot::from_env().auto_send(); let bot = Bot::from_env().auto_send();

View file

@ -23,7 +23,7 @@ use teloxide::{
stop_token::AsyncStopToken, stop_token::AsyncStopToken,
update_listeners::{self, StatefulListener}, update_listeners::{self, StatefulListener},
}, },
prelude2::*, prelude::*,
types::Update, types::Update,
}; };
@ -41,7 +41,7 @@ async fn main() {
let bot = Bot::from_env().auto_send(); let bot = Bot::from_env().auto_send();
teloxide::repls2::repl_with_listener( teloxide::repl_with_listener(
bot.clone(), bot.clone(),
|msg: Message, bot: AutoSend<Bot>| async move { |msg: Message, bot: AutoSend<Bot>| async move {
bot.send_message(msg.chat.id, "pong").await?; bot.send_message(msg.chat.id, "pong").await?;

View file

@ -1,5 +1,5 @@
use teloxide::{ use teloxide::{
prelude2::*, prelude::*,
types::{ types::{
InlineQueryResult, InlineQueryResultArticle, InputMessageContent, InputMessageContentText, InlineQueryResult, InlineQueryResultArticle, InputMessageContent, InputMessageContentText,
}, },

View file

@ -6,7 +6,7 @@ use teloxide::{
stop_token::AsyncStopToken, stop_token::AsyncStopToken,
update_listeners::{self, StatefulListener}, update_listeners::{self, StatefulListener},
}, },
prelude2::*, prelude::*,
types::Update, types::Update,
}; };
@ -24,7 +24,7 @@ async fn main() {
let bot = Bot::from_env().auto_send(); let bot = Bot::from_env().auto_send();
teloxide::repls2::repl_with_listener( teloxide::repl_with_listener(
bot.clone(), bot.clone(),
|msg: Message, bot: AutoSend<Bot>| async move { |msg: Message, bot: AutoSend<Bot>| async move {
bot.send_message(msg.chat.id, "pong").await?; bot.send_message(msg.chat.id, "pong").await?;

View file

@ -3,7 +3,7 @@
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use teloxide::prelude2::*; use teloxide::prelude::*;
static MESSAGES_TOTAL: Lazy<AtomicU64> = Lazy::new(AtomicU64::default); static MESSAGES_TOTAL: Lazy<AtomicU64> = Lazy::new(AtomicU64::default);

View file

@ -1,7 +1,17 @@
use teloxide::{prelude2::*, utils::command::BotCommand}; use teloxide::{prelude::*, utils::command::BotCommand};
use std::error::Error; use std::error::Error;
#[tokio::main]
async fn main() {
pretty_env_logger::init();
log::info!("Starting simple_commands_bot...");
let bot = Bot::from_env().auto_send();
teloxide::commands_repl(bot, answer, Command::ty()).await;
}
#[derive(BotCommand, Clone)] #[derive(BotCommand, Clone)]
#[command(rename = "lowercase", description = "These commands are supported:")] #[command(rename = "lowercase", description = "These commands are supported:")]
enum Command { enum Command {
@ -34,13 +44,3 @@ async fn answer(
Ok(()) Ok(())
} }
#[tokio::main]
async fn main() {
pretty_env_logger::init();
log::info!("Starting simple_commands_bot...");
let bot = Bot::from_env().auto_send();
teloxide::repls2::commands_repl(bot, answer, Command::ty()).await;
}

View file

@ -1,291 +0,0 @@
use crate::dispatching::{
dialogue::{
DialogueDispatcherHandler, DialogueStage, DialogueWithCx, GetChatId, InMemStorage, Storage,
},
DispatcherHandler, UpdateWithCx,
};
use std::{fmt::Debug, marker::PhantomData};
use futures::{future::BoxFuture, FutureExt, StreamExt};
use tokio::sync::mpsc;
use crate::dispatching::dialogue::InMemStorageError;
use flurry::HashMap;
use std::sync::{Arc, Mutex};
use teloxide_core::requests::Requester;
use tokio_stream::wrappers::UnboundedReceiverStream;
/// A dispatcher of dialogues.
///
/// Note that it implements [`DispatcherHandler`], so you can just put an
/// instance of this dispatcher into the [`Dispatcher`]'s methods.
///
/// Note that when the storage methods [`Storage::remove_dialogue`] and
/// [`Storage::update_dialogue`] are failed, the errors are logged, but a result
/// from [`Storage::get_dialogue`] is provided to a user handler as-is so you
/// can respond to a concrete user with an error description.
///
/// See the [module-level documentation](crate::dispatching::dialogue) for the
/// design overview.
///
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`DispatcherHandler`]: crate::dispatching::DispatcherHandler
#[deprecated(note = "Use dispatching2 instead")]
pub struct DialogueDispatcher<R, D, S, H, Upd> {
storage: Arc<S>,
handler: Arc<H>,
_phantom: PhantomData<Mutex<D>>,
/// A lock-free map to handle updates from the same chat sequentially, but
/// concurrently from different chats.
///
/// A value is the TX part of an unbounded asynchronous MPSC channel. A
/// handler that executes updates from the same chat ID sequentially
/// handles the RX part.
senders: Arc<HashMap<i64, mpsc::UnboundedSender<UpdateWithCx<R, Upd>>>>,
}
impl<R, D, H, Upd> DialogueDispatcher<R, D, InMemStorage<D>, H, Upd>
where
H: DialogueDispatcherHandler<R, Upd, D, InMemStorageError> + Send + Sync + 'static,
Upd: GetChatId + Send + 'static,
D: Default + Send + 'static,
{
/// Creates a dispatcher with the specified `handler` and [`InMemStorage`]
/// (a default storage).
///
/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage
#[must_use]
pub fn new(handler: H) -> Self {
Self {
storage: InMemStorage::new(),
handler: Arc::new(handler),
senders: Arc::new(HashMap::new()),
_phantom: PhantomData,
}
}
}
impl<R, D, S, H, Upd> DialogueDispatcher<R, D, S, H, Upd>
where
H: DialogueDispatcherHandler<R, Upd, D, S::Error> + Send + Sync + 'static,
Upd: GetChatId + Send + 'static,
D: Default + Send + 'static,
S: Storage<D> + Send + Sync + 'static,
S::Error: Debug + Send + 'static,
{
/// Creates a dispatcher with the specified `handler` and `storage`.
#[must_use]
pub fn with_storage(handler: H, storage: Arc<S>) -> Self {
Self {
storage,
handler: Arc::new(handler),
senders: Arc::new(HashMap::new()),
_phantom: PhantomData,
}
}
#[must_use]
fn new_tx(&self) -> mpsc::UnboundedSender<UpdateWithCx<R, Upd>>
where
R: Requester + Send + 'static,
{
let (tx, rx) = mpsc::unbounded_channel();
let storage = Arc::clone(&self.storage);
let handler = Arc::clone(&self.handler);
let senders = Arc::clone(&self.senders);
tokio::spawn(UnboundedReceiverStream::new(rx).for_each(move |cx: UpdateWithCx<R, Upd>| {
let storage = Arc::clone(&storage);
let handler = Arc::clone(&handler);
let senders = Arc::clone(&senders);
async move {
let chat_id = cx.update.chat_id();
let dialogue =
Arc::clone(&storage).get_dialogue(chat_id).await.map(Option::unwrap_or_default);
match handler.handle(DialogueWithCx { cx, dialogue }).await {
DialogueStage::Next(new_dialogue) => {
if let Err(e) = storage.update_dialogue(chat_id, new_dialogue).await {
log::error!("Storage::update_dialogue failed: {:?}", e);
}
}
DialogueStage::Exit => {
// On the next .poll() call, the spawned future will
// return Poll::Ready, because we are dropping the
// sender right here:
senders.pin().remove(&chat_id);
if let Err(e) = storage.remove_dialogue(chat_id).await {
log::error!("Storage::remove_dialogue failed: {:?}", e);
}
}
}
}
}));
tx
}
}
impl<R, D, S, H, Upd> DispatcherHandler<R, Upd> for DialogueDispatcher<R, D, S, H, Upd>
where
H: DialogueDispatcherHandler<R, Upd, D, S::Error> + Send + Sync + 'static,
Upd: GetChatId + Send + 'static,
D: Default + Send + 'static,
S: Storage<D> + Send + Sync + 'static,
S::Error: Debug + Send + 'static,
R: Requester + Send,
{
fn handle(
self,
updates: mpsc::UnboundedReceiver<UpdateWithCx<R, Upd>>,
) -> BoxFuture<'static, ()>
where
UpdateWithCx<R, Upd>: 'static,
{
let this = Arc::new(self);
UnboundedReceiverStream::new(updates)
.for_each(move |cx| {
let this = Arc::clone(&this);
let chat_id = cx.update.chat_id();
match this.senders.pin().get(&chat_id) {
// An old dialogue
Some(tx) => {
assert!(
tx.send(cx).is_ok(),
"We are not dropping a receiver or call .close() on it"
);
}
None => {
let tx = this.new_tx();
assert!(
tx.send(cx).is_ok(),
"We are not dropping a receiver or call .close() on it"
);
this.senders.pin().insert(chat_id, tx);
}
}
async {}
})
.boxed()
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{stream, StreamExt};
use lazy_static::lazy_static;
use teloxide_core::Bot;
use tokio::{
sync::{mpsc, Mutex},
time::Duration,
};
#[tokio::test]
#[allow(deprecated)]
async fn updates_from_same_chat_executed_sequentially() {
#[derive(Debug)]
struct MyUpdate {
chat_id: i64,
unique_number: u32,
}
impl MyUpdate {
fn new(chat_id: i64, unique_number: u32) -> Self {
Self { chat_id, unique_number }
}
}
impl GetChatId for MyUpdate {
fn chat_id(&self) -> i64 {
self.chat_id
}
}
lazy_static! {
static ref SEQ1: Mutex<Vec<u32>> = Mutex::new(Vec::new());
static ref SEQ2: Mutex<Vec<u32>> = Mutex::new(Vec::new());
static ref SEQ3: Mutex<Vec<u32>> = Mutex::new(Vec::new());
}
let dispatcher = DialogueDispatcher::new(
|cx: DialogueWithCx<Bot, MyUpdate, (), InMemStorageError>| async move {
tokio::time::sleep(Duration::from_millis(300)).await;
match cx.cx.update {
MyUpdate { chat_id: 1, unique_number } => {
SEQ1.lock().await.push(unique_number);
}
MyUpdate { chat_id: 2, unique_number } => {
SEQ2.lock().await.push(unique_number);
}
MyUpdate { chat_id: 3, unique_number } => {
SEQ3.lock().await.push(unique_number);
}
_ => unreachable!(),
}
DialogueStage::Next(())
},
);
let updates = stream::iter(
vec![
MyUpdate::new(1, 174),
MyUpdate::new(1, 125),
MyUpdate::new(2, 411),
MyUpdate::new(1, 2),
MyUpdate::new(2, 515),
MyUpdate::new(2, 623),
MyUpdate::new(1, 193),
MyUpdate::new(1, 104),
MyUpdate::new(2, 2222),
MyUpdate::new(2, 737),
MyUpdate::new(3, 72782),
MyUpdate::new(3, 2737),
MyUpdate::new(1, 7),
MyUpdate::new(1, 7778),
MyUpdate::new(3, 5475),
MyUpdate::new(3, 1096),
MyUpdate::new(3, 872),
MyUpdate::new(2, 10),
MyUpdate::new(2, 55456),
MyUpdate::new(3, 5665),
MyUpdate::new(3, 1611),
]
.into_iter()
.map(|update| UpdateWithCx { update, requester: Bot::new("Doesn't matter here") })
.collect::<Vec<UpdateWithCx<Bot, MyUpdate>>>(),
);
let (tx, rx) = mpsc::unbounded_channel();
updates
.for_each(move |update| {
let tx = tx.clone();
async move {
assert!(tx.send(update).is_ok(), "tx.send(update) failed");
}
})
.await;
dispatcher.handle(rx).await;
// Wait until our futures to be finished.
tokio::time::sleep(Duration::from_millis(3000)).await;
assert_eq!(*SEQ1.lock().await, vec![174, 125, 2, 193, 104, 7, 7778]);
assert_eq!(*SEQ2.lock().await, vec![411, 515, 623, 2222, 737, 10, 55456]);
assert_eq!(*SEQ3.lock().await, vec![72782, 2737, 5475, 1096, 872, 5665, 1611]);
}
}

View file

@ -1,41 +0,0 @@
use crate::dispatching::dialogue::{DialogueStage, DialogueWithCx};
use futures::future::BoxFuture;
use std::{future::Future, sync::Arc};
/// An asynchronous handler of an update used in [`DialogueDispatcher`].
///
/// See [the module-level documentation for the design
/// overview](crate::dispatching::dialogue).
///
/// [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher
#[deprecated(note = "Use dispatching2 instead")]
pub trait DialogueDispatcherHandler<R, Upd, D, E> {
#[must_use]
fn handle(
self: Arc<Self>,
cx: DialogueWithCx<R, Upd, D, E>,
) -> BoxFuture<'static, DialogueStage<D>>
where
DialogueWithCx<R, Upd, D, E>: Send + 'static,
R: Send,
Upd: Send,
D: Send,
E: Send;
}
impl<R, Upd, D, E, F, Fut> DialogueDispatcherHandler<R, Upd, D, E> for F
where
F: Fn(DialogueWithCx<R, Upd, D, E>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = DialogueStage<D>> + Send + 'static,
{
fn handle(self: Arc<Self>, cx: DialogueWithCx<R, Upd, D, E>) -> BoxFuture<'static, Fut::Output>
where
DialogueWithCx<R, Upd, D, E>: Send + 'static,
R: Send,
Upd: Send,
D: Send,
E: Send,
{
Box::pin(async move { self(cx).await })
}
}

View file

@ -1,40 +0,0 @@
use crate::dispatching::dialogue::TransitionOut;
/// Continue or terminate a dialogue.
///
/// See [the module-level documentation for the design
/// overview](crate::dispatching::dialogue).
#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq)]
#[deprecated(note = "Use dispatching2 instead")]
pub enum DialogueStage<D> {
Next(D),
Exit,
}
/// Returns a new dialogue state.
///
/// Note the `Dialogue: From<State>` constraint. It means that you don't need to
/// pass `Dialogue` -- you can just pass one of it's states. [`From`] can be
/// conveniently derived by [derive-more].
///
/// See [the module-level documentation for the design
/// overview](crate::dispatching::dialogue).
///
/// [`From`]: std::convert::From
/// [derive-more]: https://crates.io/crates/derive_more
#[deprecated(note = "Use dispatching2 instead")]
pub fn next<Dialogue, State, E>(new_state: State) -> TransitionOut<Dialogue, E>
where
Dialogue: From<State>,
{
Ok(DialogueStage::Next(Dialogue::from(new_state)))
}
/// Exits a dialogue.
///
/// See [the module-level documentation for the design
/// overview](crate::dispatching::dialogue).
#[deprecated(note = "Use dispatching2 instead")]
pub fn exit<D, E>() -> TransitionOut<D, E> {
Ok(DialogueStage::Exit)
}

View file

@ -1,33 +0,0 @@
use crate::dispatching::{dialogue::GetChatId, UpdateWithCx};
use std::fmt::Debug;
use teloxide_core::requests::Requester;
/// A context of a [`DialogueDispatcher`]'s message handler.
///
/// See [the module-level documentation for the design
/// overview](crate::dispatching::dialogue).
///
/// [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher
#[derive(Debug)]
#[deprecated(note = "Use dispatching2 instead")]
pub struct DialogueWithCx<R, Upd, D, E> {
pub cx: UpdateWithCx<R, Upd>,
pub dialogue: Result<D, E>,
}
impl<Upd, R, D, E> DialogueWithCx<R, Upd, D, E> {
/// Creates a new instance with the provided fields.
pub fn new(cx: UpdateWithCx<R, Upd>, dialogue: D) -> Self {
Self { cx, dialogue: Ok(dialogue) }
}
}
impl<Upd, R, D, E> GetChatId for DialogueWithCx<R, Upd, D, E>
where
Upd: GetChatId,
R: Requester,
{
fn chat_id(&self) -> i64 {
self.cx.update.chat_id()
}
}

View file

@ -1,14 +1,20 @@
use crate::types::CallbackQuery;
use teloxide_core::types::Message; use teloxide_core::types::Message;
/// Something that has a chat ID. /// Something that may has a chat ID.
#[deprecated(note = "Use dispatching2 instead")]
pub trait GetChatId { pub trait GetChatId {
#[must_use] #[must_use]
fn chat_id(&self) -> i64; fn chat_id(&self) -> Option<i64>;
} }
impl GetChatId for Message { impl GetChatId for Message {
fn chat_id(&self) -> i64 { fn chat_id(&self) -> Option<i64> {
self.chat.id Some(self.chat.id)
}
}
impl GetChatId for CallbackQuery {
fn chat_id(&self) -> Option<i64> {
self.message.as_ref().map(|mes| mes.chat.id)
} }
} }

View file

@ -1,172 +1,181 @@
//! Dealing with dialogues. //! Support for user dialogues.
//! //!
//! There are three main components: //! The main type is (surprise!) [`Dialogue`]. Under the hood, it is just a
//! wrapper over [`Storage`] and a chat ID. All it does is provides convenient
//! method for manipulating the dialogue state. [`Storage`] is where all
//! dialogue states are stored; it can be either [`InMemStorage`], which is a
//! simple hash map, or database wrappers such as [`SqliteStorage`]. In the
//! latter case, your dialogues are _persistent_, meaning that you can safely
//! restart your bot and all dialogues will remain in the database -- this is a
//! preferred method for production bots.
//! //!
//! 1. Your type `D` (typically an enumeration), implementing [`Transition`]. //! [`examples/dialogue.rs`] clearly demonstrates the typical usage of
//! It is essentially a [FSM]: its variants are possible dialogue states and //! dialogues. Your dialogue state can be represented as an enumeration:
//! [`Transition::react`] is a transition function.
//! //!
//! 2. State types, forming `D`. They implement [`Subtransition`]. //! ```ignore
//! #[derive(DialogueState, Clone)]
//! #[handler_out(anyhow::Result<()>)]
//! pub enum State {
//! #[handler(handle_start)]
//! Start,
//! //!
//! 2. [`Storage<D>`], which encapsulates all the dialogues. //! #[handler(handle_receive_full_name)]
//! ReceiveFullName,
//! //!
//! 3. [`DialogueDispatcher`], which encapsulates your handler, [`Storage<D>`], //! #[handler(handle_receive_age)]
//! and implements [`DispatcherHandler`]. //! ReceiveAge { full_name: String },
//! //!
//! You pass [`DialogueDispatcher`] into [`Dispatcher`]. Every time //! #[handler(handle_receive_location)]
//! [`Dispatcher`] sees an incoming input, it is transferred into //! ReceiveLocation { full_name: String, age: u8 },
//! [`DialogueDispatcher`], and the following steps are executed:
//!
//! 1. If a storage doesn't contain a dialogue from this chat, supply
//! `D::default()` into you handler, otherwise, supply the saved dialogue
//! from this chat.
//! 2. If a handler has returned [`DialogueStage::Exit`], remove the dialogue
//! from the storage, otherwise ([`DialogueStage::Next`]) force the storage to
//! update the dialogue.
//!
//! To avoid boilerplate, teloxide exports these convenient things: the [`next`]
//! and [`exit`] functions, and `#[derive(BotDialogue)]` with
//! `#[teloxide(subtransition)]`. Here's how your dialogues management code
//! skeleton should look like:
//!
//! ```no_run
//! # #[cfg(feature = "macros")] {
//! use std::convert::Infallible;
//!
//! use teloxide::{
//! dispatching::dialogue::{InMemStorageError, Transition},
//! prelude::*,
//! teloxide, RequestError,
//! };
//!
//! #[derive(Clone)]
//! struct _1State;
//! #[derive(Clone)]
//! struct _2State;
//! #[derive(Clone)]
//! struct _3State;
//!
//! type Out = TransitionOut<D, RequestError>;
//!
//! #[teloxide(subtransition)]
//! async fn _1_transition(_state: _1State, _cx: TransitionIn<AutoSend<Bot>>) -> Out {
//! todo!()
//! } //! }
//!
//! #[teloxide(subtransition)]
//! async fn _2_transition(_state: _2State, _cx: TransitionIn<AutoSend<Bot>>) -> Out {
//! todo!()
//! }
//!
//! #[teloxide(subtransition)]
//! async fn _3_transition(_state: _3State, _cx: TransitionIn<AutoSend<Bot>>) -> Out {
//! todo!()
//! }
//!
//! #[derive(Clone, Transition)]
//! enum D {
//! _1(_1State),
//! _2(_2State),
//! _3(_3State),
//! }
//!
//! impl Default for D {
//! fn default() -> Self {
//! Self::_1(_1State)
//! }
//! }
//!
//! type In = DialogueWithCx<AutoSend<Bot>, Message, D, InMemStorageError>;
//!
//! #[tokio::main]
//! async fn main() {
//! pretty_env_logger::init();
//! log::info!("Starting dialogue_bot!");
//!
//! let bot = Bot::from_env().auto_send();
//!
//! Dispatcher::new(bot)
//! .messages_handler(DialogueDispatcher::new(
//! |DialogueWithCx { cx, dialogue }: In| async move {
//! // No panic because of std::convert::Infallible.
//! let dialogue = dialogue.unwrap();
//! dialogue
//! // Instead of () you can pass an arbitrary value, see below.
//! .react(cx, ())
//! .await
//! .expect("Something wrong with the bot!")
//! },
//! ))
//! .dispatch()
//! .await;
//! }
//! # }
//! ``` //! ```
//! //!
//! - `#[teloxide(subtransition)]` implements [`Subtransition`] for the first //! Each state is associated with its respective handler: e.g., when a dialogue
//! argument of a function. //! state is `ReceiveAge`, `handle_receive_age` is invoked:
//! - `#[derive(Transition)]` implements [`Transition`] for `D`, if all the
//! variants implement [`Subtransition`].
//! //!
//! `()` in `.react(cx, ())` is an arbitrary value, which you can pass into //! ```ignore
//! Subtransitions. Just append `ans: T` to the parameters of the //! async fn handle_receive_age(
//! Subtransitions to pass a differen type. //! bot: AutoSend<Bot>,
//! msg: Message,
//! dialogue: MyDialogue,
//! (full_name,): (String,), // Available from `State::ReceiveAge`.
//! ) -> anyhow::Result<()> {
//! match msg.text().map(|text| text.parse::<u8>()) {
//! Some(Ok(age)) => {
//! bot.send_message(msg.chat.id, "What's your location?").await?;
//! dialogue.update(State::ReceiveLocation { full_name, age }).await?;
//! }
//! _ => {
//! bot.send_message(msg.chat.id, "Send me a number.").await?;
//! }
//! }
//! //!
//! See [examples/dialogue_bot] as a real example. //! Ok(())
//! }
//! ```
//! //!
//! [`Transition`]: crate::dispatching::dialogue::Transition //! Variant's fields are passed to state handlers as tuples: `(full_name,):
//! [`Subtransition`]: crate::dispatching::dialogue::Subtransition //! (String,)`. Using [`Dialogue::update`], you can update the dialogue with a
//! [`Transition::react`]: //! new state, in our case -- `State::ReceiveLocation { full_name, age }`. To
//! crate::dispatching::dialogue::Transition::react //! exit the dialogue, just call [`Dialogue::exit`] and it will be removed from
//! [FSM]: https://en.wikipedia.org/wiki/Finite-state_machine //! the inner storage:
//! //!
//! [`Storage<D>`]: crate::dispatching::dialogue::Storage //! ```ignore
//! async fn handle_receive_location(
//! bot: AutoSend<Bot>,
//! msg: Message,
//! dialogue: MyDialogue,
//! (full_name, age): (String, u8), // Available from `State::ReceiveLocation`.
//! ) -> anyhow::Result<()> {
//! match msg.text() {
//! Some(location) => {
//! let message =
//! format!("Full name: {}\nAge: {}\nLocation: {}", full_name, age, location);
//! bot.send_message(msg.chat.id, message).await?;
//! dialogue.exit().await?;
//! }
//! None => {
//! bot.send_message(msg.chat.id, "Send me a text message.").await?;
//! }
//! }
//! //!
//! [`DialogueStage<D>`]: crate::dispatching::dialogue::DialogueStage //! Ok(())
//! [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher //! }
//! ```
//! //!
//! [`DialogueStage::Exit`]: //! [`examples/dialogue.rs`]: https://github.com/teloxide/teloxide/blob/master/examples/dialogue.rs
//! crate::dispatching::dialogue::DialogueStage::Exit
//! [`DialogueStage::Next`]: crate::dispatching::dialogue::DialogueStage::Next
//!
//! [`up!`]: crate::up
//! [`next`]: crate::dispatching::dialogue::next
//! [`exit`]: crate::dispatching::dialogue::exit
//!
//! [`DispatcherHandler`]: crate::dispatching::DispatcherHandler
//! [`Dispatcher`]: crate::dispatching::Dispatcher
//! [`Dispatcher::messages_handler`]:
//! crate::dispatching::Dispatcher::messages_handler
//! [`UpdateKind::Message(message)`]: crate::types::UpdateKind::Message
//!
//! [examples/dialogue_bot]: https://github.com/teloxide/teloxide/tree/master/examples/dialogue_bot
#![allow(clippy::type_complexity)]
mod dialogue_dispatcher;
mod dialogue_dispatcher_handler;
mod dialogue_stage;
mod dialogue_with_cx;
mod get_chat_id;
mod storage;
mod transition;
pub use dialogue_dispatcher::DialogueDispatcher;
pub use dialogue_dispatcher_handler::DialogueDispatcherHandler;
pub use dialogue_stage::{exit, next, DialogueStage};
pub use dialogue_with_cx::DialogueWithCx;
pub use get_chat_id::GetChatId;
pub use transition::{
Subtransition, SubtransitionOutputType, Transition, TransitionIn, TransitionOut,
};
#[cfg(feature = "macros")]
pub use teloxide_macros::Transition;
#[cfg(feature = "redis-storage")] #[cfg(feature = "redis-storage")]
pub use storage::{RedisStorage, RedisStorageError}; pub use crate::dispatching::dialogue::{RedisStorage, RedisStorageError};
#[cfg(feature = "sqlite-storage")] #[cfg(feature = "sqlite-storage")]
pub use storage::{SqliteStorage, SqliteStorageError}; pub use crate::dispatching::dialogue::{SqliteStorage, SqliteStorageError};
pub use get_chat_id::GetChatId;
pub use storage::*; pub use storage::*;
use std::{marker::PhantomData, sync::Arc};
mod get_chat_id;
mod storage;
/// A handle for controlling dialogue state.
#[derive(Debug)]
pub struct Dialogue<D, S>
where
S: ?Sized,
{
storage: Arc<S>,
chat_id: i64,
_phantom: PhantomData<D>,
}
// `#[derive]` requires generics to implement `Clone`, but `S` is wrapped around
// `Arc`, and `D` is wrapped around PhantomData.
impl<D, S> Clone for Dialogue<D, S>
where
S: ?Sized,
{
fn clone(&self) -> Self {
Dialogue { storage: self.storage.clone(), chat_id: self.chat_id, _phantom: PhantomData }
}
}
impl<D, S> Dialogue<D, S>
where
D: Send + 'static,
S: Storage<D> + ?Sized,
{
/// Constructs a new dialogue with `storage` (where dialogues are stored)
/// and `chat_id` of a current dialogue.
pub fn new(storage: Arc<S>, chat_id: i64) -> Self {
Self { storage, chat_id, _phantom: PhantomData }
}
/// Retrieves the current state of the dialogue or `None` if there is no
/// dialogue.
pub async fn get(&self) -> Result<Option<D>, S::Error> {
self.storage.clone().get_dialogue(self.chat_id).await
}
/// Like [`Dialogue::get`] but returns a default value if there is no
/// dialogue.
pub async fn get_or_default(&self) -> Result<D, S::Error>
where
D: Default,
{
match self.get().await? {
Some(d) => Ok(d),
None => {
self.storage.clone().update_dialogue(self.chat_id, D::default()).await?;
Ok(D::default())
}
}
}
/// Updates the dialogue state.
///
/// The dialogue type `D` must implement `From<State>` to allow implicit
/// conversion from `State` to `D`.
pub async fn update<State>(&self, state: State) -> Result<(), S::Error>
where
D: From<State>,
{
let new_dialogue = state.into();
self.storage.clone().update_dialogue(self.chat_id, new_dialogue).await?;
Ok(())
}
/// Updates the dialogue with a default value.
pub async fn reset(&self) -> Result<(), S::Error>
where
D: Default,
{
self.update(D::default()).await
}
/// Removes the dialogue from the storage provided to [`Dialogue::new`].
pub async fn exit(&self) -> Result<(), S::Error> {
self.storage.clone().remove_dialogue(self.chat_id).await
}
}

View file

@ -1,67 +0,0 @@
use crate::dispatching::{dialogue::DialogueStage, UpdateWithCx};
use futures::future::BoxFuture;
use teloxide_core::types::Message;
/// Represents a transition function of a dialogue FSM.
#[deprecated(note = "Use dispatching2 instead")]
pub trait Transition: Sized {
type Aux;
type Error;
type Requester;
/// Turns itself into another state, depending on the input message.
///
/// `aux` will be passed to each subtransition function.
fn react(
self,
cx: TransitionIn<Self::Requester>,
aux: Self::Aux,
) -> BoxFuture<'static, TransitionOut<Self, Self::Error>>;
}
/// Like [`Transition`], but from `StateN` -> `Dialogue`.
///
/// [`Transition`]: crate::dispatching::dialogue::Transition
#[deprecated(note = "Use dispatching2 instead")]
pub trait Subtransition
where
Self::Dialogue: Transition<Aux = Self::Aux>,
{
type Aux;
type Dialogue;
type Error;
type Requester;
/// Turns itself into another state, depending on the input message.
///
/// `aux` is something that is provided by the call side, for example,
/// message's text.
fn react(
self,
cx: TransitionIn<Self::Requester>,
aux: Self::Aux,
) -> BoxFuture<'static, TransitionOut<Self::Dialogue, Self::Error>>;
}
/// A type returned from a FSM subtransition function.
///
/// Now it is used only inside `#[teloxide(subtransition)]` for type inference.
#[doc(hidden)]
#[deprecated(note = "Use dispatching2 instead")]
pub trait SubtransitionOutputType {
type Output;
type Error;
}
impl<D, E> SubtransitionOutputType for TransitionOut<D, E> {
type Output = D;
type Error = E;
}
/// An input passed into a FSM (sub)transition function.
#[deprecated(note = "Use dispatching2 instead")]
pub type TransitionIn<R> = UpdateWithCx<R, Message>;
/// A type returned from a FSM (sub)transition function.
#[deprecated(note = "Use dispatching2 instead")]
pub type TransitionOut<D, E = crate::RequestError> = Result<DialogueStage<D>, E>;

View file

@ -1,263 +1,158 @@
use std::{fmt::Debug, sync::Arc};
use crate::{ use crate::{
adaptors::CacheMe,
dispatching::{ dispatching::{
stop_token::StopToken, stop_token::StopToken, update_listeners, update_listeners::UpdateListener, ShutdownToken,
update_listeners::{self, UpdateListener},
DispatcherHandler, UpdateWithCx,
}, },
error_handlers::{ErrorHandler, LoggingErrorHandler}, error_handlers::{ErrorHandler, LoggingErrorHandler},
requests::Requester,
types::{AllowedUpdate, Update},
utils::shutdown_token::shutdown_check_timeout_for, utils::shutdown_token::shutdown_check_timeout_for,
}; };
use dptree::di::{DependencyMap, DependencySupplier};
use futures::{stream::FuturesUnordered, StreamExt}; use futures::{future::BoxFuture, StreamExt};
use std::{collections::HashSet, fmt::Debug, ops::ControlFlow, sync::Arc};
use teloxide_core::{ use teloxide_core::{
requests::Requester, requests::{Request, RequesterExt},
types::{ types::UpdateKind,
AllowedUpdate, CallbackQuery, ChatJoinRequest, ChatMemberUpdated, ChosenInlineResult,
InlineQuery, Message, Poll, PollAnswer, PreCheckoutQuery, ShippingQuery, Update,
UpdateKind,
},
}; };
use tokio::{sync::mpsc, task::JoinHandle, time::timeout}; use tokio::time::timeout;
use crate::utils::shutdown_token::ShutdownToken; use std::future::Future;
type Tx<Upd, R> = Option<mpsc::UnboundedSender<UpdateWithCx<Upd, R>>>; /// The builder for [`Dispatcher`].
pub struct DispatcherBuilder<R, Err> {
bot: R,
dependencies: DependencyMap,
handler: UpdateHandler<Err>,
default_handler: DefaultHandler,
error_handler: Arc<dyn ErrorHandler<Err> + Send + Sync>,
}
/// One dispatcher to rule them all. impl<R, Err> DispatcherBuilder<R, Err>
/// where
/// See the [module-level documentation](crate::dispatching) for the design R: Clone + Requester + Clone + Send + Sync + 'static,
/// overview. Err: Debug + Send + Sync + 'static,
#[deprecated(note = "Use dispatching2 instead")] {
pub struct Dispatcher<R> { /// Specifies a handler that will be called for an unhandled update.
requester: R, ///
/// By default, it is a mere [`log::warn`].
#[must_use]
pub fn default_handler<H, Fut>(self, handler: H) -> Self
where
H: Fn(Arc<Update>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let handler = Arc::new(handler);
messages_queue: Tx<R, Message>, Self {
edited_messages_queue: Tx<R, Message>, default_handler: Box::new(move |upd| {
channel_posts_queue: Tx<R, Message>, let handler = Arc::clone(&handler);
edited_channel_posts_queue: Tx<R, Message>, Box::pin(handler(upd))
inline_queries_queue: Tx<R, InlineQuery>, }),
chosen_inline_results_queue: Tx<R, ChosenInlineResult>, ..self
callback_queries_queue: Tx<R, CallbackQuery>, }
shipping_queries_queue: Tx<R, ShippingQuery>, }
pre_checkout_queries_queue: Tx<R, PreCheckoutQuery>,
polls_queue: Tx<R, Poll>,
poll_answers_queue: Tx<R, PollAnswer>,
my_chat_members_queue: Tx<R, ChatMemberUpdated>,
chat_members_queue: Tx<R, ChatMemberUpdated>,
chat_join_requests_queue: Tx<R, ChatJoinRequest>,
running_handlers: FuturesUnordered<JoinHandle<()>>, /// Specifies a handler that will be called on a handler error.
///
/// By default, it is [`LoggingErrorHandler`].
#[must_use]
pub fn error_handler(self, handler: Arc<dyn ErrorHandler<Err> + Send + Sync>) -> Self {
Self { error_handler: handler, ..self }
}
/// Specifies dependencies that can be used inside of handlers.
///
/// By default, there is no dependencies.
#[must_use]
pub fn dependencies(self, dependencies: DependencyMap) -> Self {
Self { dependencies, ..self }
}
/// Constructs [`Dispatcher`].
#[must_use]
pub fn build(self) -> Dispatcher<R, Err> {
Dispatcher {
bot: self.bot.clone(),
cache_me_bot: self.bot.cache_me(),
dependencies: self.dependencies,
handler: self.handler,
default_handler: self.default_handler,
error_handler: self.error_handler,
allowed_updates: Default::default(),
state: ShutdownToken::new(),
}
}
}
/// The base for update dispatching.
pub struct Dispatcher<R, Err> {
bot: R,
cache_me_bot: CacheMe<R>,
dependencies: DependencyMap,
handler: UpdateHandler<Err>,
default_handler: DefaultHandler,
error_handler: Arc<dyn ErrorHandler<Err> + Send + Sync>,
// TODO: respect allowed_udpates
allowed_updates: HashSet<AllowedUpdate>,
state: ShutdownToken, state: ShutdownToken,
} }
impl<R> Dispatcher<R> // TODO: it is allowed to return message as response on telegram request in
// webhooks, so we can allow this too. See more there: https://core.telegram.org/bots/api#making-requests-when-getting-updates
/// A handler that processes updates from Telegram.
pub type UpdateHandler<Err> = dptree::Handler<'static, DependencyMap, Result<(), Err>>;
type DefaultHandler = Box<dyn Fn(Arc<Update>) -> BoxFuture<'static, ()> + Send + Sync>;
impl<R, Err> Dispatcher<R, Err>
where where
R: Send + 'static, R: Requester + Clone + Send + Sync + 'static,
Err: Send + Sync + 'static,
{ {
/// Constructs a new dispatcher with the specified `requester`. /// Constructs a new [`DispatcherBuilder`] with `bot` and `handler`.
#[must_use] #[must_use]
pub fn new(requester: R) -> Self { pub fn builder(bot: R, handler: UpdateHandler<Err>) -> DispatcherBuilder<R, Err>
Self { where
requester, Err: Debug,
messages_queue: None, {
edited_messages_queue: None, DispatcherBuilder {
channel_posts_queue: None, bot,
edited_channel_posts_queue: None, dependencies: DependencyMap::new(),
inline_queries_queue: None, handler,
chosen_inline_results_queue: None, default_handler: Box::new(|upd| {
callback_queries_queue: None, log::warn!("Unhandled update: {:?}", upd);
shipping_queries_queue: None, Box::pin(async {})
pre_checkout_queries_queue: None, }),
polls_queue: None, error_handler: LoggingErrorHandler::new(),
poll_answers_queue: None,
my_chat_members_queue: None,
chat_members_queue: None,
chat_join_requests_queue: None,
running_handlers: FuturesUnordered::new(),
state: ShutdownToken::new(),
} }
} }
#[must_use]
fn new_tx<H, Upd>(&mut self, h: H) -> Tx<R, Upd>
where
H: DispatcherHandler<R, Upd> + Send + 'static,
Upd: Send + 'static,
R: Send + 'static,
{
let (tx, rx) = mpsc::unbounded_channel();
let join_handle = tokio::spawn(h.handle(rx));
self.running_handlers.push(join_handle);
Some(tx)
}
/// Setup the `^C` handler which [`shutdown`]s dispatching.
///
/// [`shutdown`]: ShutdownToken::shutdown
#[cfg(feature = "ctrlc_handler")]
#[must_use]
pub fn setup_ctrlc_handler(self) -> Self {
let token = self.state.clone();
tokio::spawn(async move {
loop {
tokio::signal::ctrl_c().await.expect("Failed to listen for ^C");
match token.shutdown() {
Ok(f) => {
log::info!("^C received, trying to shutdown the dispatcher...");
f.await;
log::info!("dispatcher is shutdown...");
}
Err(_) => {
log::info!("^C received, the dispatcher isn't running, ignoring the signal")
}
}
}
});
self
}
#[must_use]
pub fn messages_handler<H>(mut self, h: H) -> Self
where
H: DispatcherHandler<R, Message> + 'static + Send,
{
self.messages_queue = self.new_tx(h);
self
}
#[must_use]
pub fn edited_messages_handler<H>(mut self, h: H) -> Self
where
H: DispatcherHandler<R, Message> + 'static + Send,
{
self.edited_messages_queue = self.new_tx(h);
self
}
#[must_use]
pub fn channel_posts_handler<H>(mut self, h: H) -> Self
where
H: DispatcherHandler<R, Message> + 'static + Send,
{
self.channel_posts_queue = self.new_tx(h);
self
}
#[must_use]
pub fn edited_channel_posts_handler<H>(mut self, h: H) -> Self
where
H: DispatcherHandler<R, Message> + 'static + Send,
{
self.edited_channel_posts_queue = self.new_tx(h);
self
}
#[must_use]
pub fn inline_queries_handler<H>(mut self, h: H) -> Self
where
H: DispatcherHandler<R, InlineQuery> + 'static + Send,
{
self.inline_queries_queue = self.new_tx(h);
self
}
#[must_use]
pub fn chosen_inline_results_handler<H>(mut self, h: H) -> Self
where
H: DispatcherHandler<R, ChosenInlineResult> + 'static + Send,
{
self.chosen_inline_results_queue = self.new_tx(h);
self
}
#[must_use]
pub fn callback_queries_handler<H>(mut self, h: H) -> Self
where
H: DispatcherHandler<R, CallbackQuery> + 'static + Send,
{
self.callback_queries_queue = self.new_tx(h);
self
}
#[must_use]
pub fn shipping_queries_handler<H>(mut self, h: H) -> Self
where
H: DispatcherHandler<R, ShippingQuery> + 'static + Send,
{
self.shipping_queries_queue = self.new_tx(h);
self
}
#[must_use]
pub fn pre_checkout_queries_handler<H>(mut self, h: H) -> Self
where
H: DispatcherHandler<R, PreCheckoutQuery> + 'static + Send,
{
self.pre_checkout_queries_queue = self.new_tx(h);
self
}
#[must_use]
pub fn polls_handler<H>(mut self, h: H) -> Self
where
H: DispatcherHandler<R, Poll> + 'static + Send,
{
self.polls_queue = self.new_tx(h);
self
}
#[must_use]
pub fn poll_answers_handler<H>(mut self, h: H) -> Self
where
H: DispatcherHandler<R, PollAnswer> + 'static + Send,
{
self.poll_answers_queue = self.new_tx(h);
self
}
#[must_use]
pub fn my_chat_members_handler<H>(mut self, h: H) -> Self
where
H: DispatcherHandler<R, ChatMemberUpdated> + 'static + Send,
{
self.my_chat_members_queue = self.new_tx(h);
self
}
#[must_use]
pub fn chat_members_handler<H>(mut self, h: H) -> Self
where
H: DispatcherHandler<R, ChatMemberUpdated> + 'static + Send,
{
self.chat_members_queue = self.new_tx(h);
self
}
/// Starts your bot with the default parameters. /// Starts your bot with the default parameters.
/// ///
/// The default parameters are a long polling update listener and log all /// The default parameters are a long polling update listener and log all
/// errors produced by this listener). /// errors produced by this listener.
/// ///
/// Please note that after shutting down (either because of [`shutdown`], /// Each time a handler is invoked, [`Dispatcher`] adds the following
/// [a ctrlc signal], or [`UpdateListener`] returning `None`) all handlers /// dependencies (in addition to those passed to
/// will be gone. As such, to restart listening you need to re-add /// [`DispatcherBuilder::dependencies`]):
/// handlers. ///
/// - Your bot passed to [`Dispatcher::builder`];
/// - An update from Telegram;
/// - [`crate::types::Me`] (can be used in [`HandlerExt::filter_command`]).
/// ///
/// [`shutdown`]: ShutdownToken::shutdown /// [`shutdown`]: ShutdownToken::shutdown
/// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler /// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler
/// [`HandlerExt::filter_command`]: crate::dispatching::HandlerExt::filter_command
pub async fn dispatch(&mut self) pub async fn dispatch(&mut self)
where where
R: Requester + Clone, R: Requester + Clone,
<R as Requester>::GetUpdates: Send, <R as Requester>::GetUpdates: Send,
{ {
let listener = update_listeners::polling_default(self.requester.clone()).await; let listener = update_listeners::polling_default(self.bot.clone()).await;
let error_handler = let error_handler =
LoggingErrorHandler::with_custom_text("An error from the update listener"); LoggingErrorHandler::with_custom_text("An error from the update listener");
@ -267,10 +162,7 @@ where
/// Starts your bot with custom `update_listener` and /// Starts your bot with custom `update_listener` and
/// `update_listener_error_handler`. /// `update_listener_error_handler`.
/// ///
/// Please note that after shutting down (either because of [`shutdown`], /// This method adds the same dependencies as [`Dispatcher::dispatch`].
/// [a ctrlc signal], or [`UpdateListener`] returning `None`) all handlers
/// will be gone. As such, to restart listening you need to re-add
/// handlers.
/// ///
/// [`shutdown`]: ShutdownToken::shutdown /// [`shutdown`]: ShutdownToken::shutdown
/// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler /// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler
@ -282,9 +174,8 @@ where
UListener: UpdateListener<ListenerE> + 'a, UListener: UpdateListener<ListenerE> + 'a,
Eh: ErrorHandler<ListenerE> + 'a, Eh: ErrorHandler<ListenerE> + 'a,
ListenerE: Debug, ListenerE: Debug,
R: Requester + Clone,
{ {
self.hint_allowed_updates(&mut update_listener); update_listener.hint_allowed_updates(&mut self.allowed_updates.clone().into_iter());
let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener); let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener);
let mut stop_token = Some(update_listener.stop_token()); let mut stop_token = Some(update_listener.stop_token());
@ -309,222 +200,111 @@ where
if let Some(token) = stop_token.take() { if let Some(token) = stop_token.take() {
log::debug!("Start shutting down dispatching..."); log::debug!("Start shutting down dispatching...");
token.stop(); token.stop();
break;
} }
} }
} }
} }
self.wait_for_handlers().await; // TODO: wait for executing handlers?
self.state.done(); self.state.done();
} }
async fn process_update<LErr, LErrHandler>(
&self,
update: Result<Update, LErr>,
err_handler: &Arc<LErrHandler>,
) where
LErrHandler: ErrorHandler<LErr>,
{
match update {
Ok(upd) => {
if let UpdateKind::Error(err) = upd.kind {
log::error!(
"Cannot parse an update.\nError: {:?}\n\
This is a bug in teloxide-core, please open an issue here: \
https://github.com/teloxide/teloxide/issues.",
err,
);
return;
}
let mut deps = self.dependencies.clone();
deps.insert(upd);
deps.insert(self.bot.clone());
deps.insert(
self.cache_me_bot.get_me().send().await.expect("Failed to retrieve 'me'"),
);
match self.handler.dispatch(deps).await {
ControlFlow::Break(Ok(())) => {}
ControlFlow::Break(Err(err)) => {
self.error_handler.clone().handle_error(err).await
}
ControlFlow::Continue(deps) => {
let upd = deps.get();
(self.default_handler)(upd).await;
}
}
}
Err(err) => err_handler.clone().handle_error(err).await,
}
}
/// Setups the `^C` handler that [`shutdown`]s dispatching.
///
/// [`shutdown`]: ShutdownToken::shutdown
#[cfg(feature = "ctrlc_handler")]
pub fn setup_ctrlc_handler(&mut self) -> &mut Self {
let token = self.state.clone();
tokio::spawn(async move {
loop {
tokio::signal::ctrl_c().await.expect("Failed to listen for ^C");
match token.shutdown() {
Ok(f) => {
log::info!("^C received, trying to shutdown the dispatcher...");
f.await;
log::info!("dispatcher is shutdown...");
}
Err(_) => {
log::info!("^C received, the dispatcher isn't running, ignoring the signal")
}
}
}
});
self
}
/// Returns a shutdown token, which can later be used to shutdown /// Returns a shutdown token, which can later be used to shutdown
/// dispatching. /// dispatching.
pub fn shutdown_token(&self) -> ShutdownToken { pub fn shutdown_token(&self) -> ShutdownToken {
self.state.clone() self.state.clone()
} }
}
async fn process_update<ListenerE, Eh>( #[cfg(test)]
&self, mod tests {
update: Result<Update, ListenerE>, use std::convert::Infallible;
update_listener_error_handler: &Arc<Eh>,
) where
R: Requester + Clone,
Eh: ErrorHandler<ListenerE>,
ListenerE: Debug,
{
{
log::trace!("Dispatcher received an update: {:?}", update);
let update = match update { use teloxide_core::Bot;
Ok(update) => update,
Err(error) => {
Arc::clone(update_listener_error_handler).handle_error(error).await;
return;
}
};
match update.kind { use super::*;
UpdateKind::Message(message) => {
send(&self.requester, &self.messages_queue, message, "UpdateKind::Message") #[tokio::test]
} async fn test_tokio_spawn() {
UpdateKind::EditedMessage(message) => send( tokio::spawn(async {
&self.requester, // Just check that this code compiles.
&self.edited_messages_queue, if false {
message, Dispatcher::<_, Infallible>::builder(Bot::new(""), dptree::entry())
"UpdateKind::EditedMessage", .build()
), .dispatch()
UpdateKind::ChannelPost(post) => send( .await;
&self.requester,
&self.channel_posts_queue,
post,
"UpdateKind::ChannelPost",
),
UpdateKind::EditedChannelPost(post) => send(
&self.requester,
&self.edited_channel_posts_queue,
post,
"UpdateKind::EditedChannelPost",
),
UpdateKind::InlineQuery(query) => send(
&self.requester,
&self.inline_queries_queue,
query,
"UpdateKind::InlineQuery",
),
UpdateKind::ChosenInlineResult(result) => send(
&self.requester,
&self.chosen_inline_results_queue,
result,
"UpdateKind::ChosenInlineResult",
),
UpdateKind::CallbackQuery(query) => send(
&self.requester,
&self.callback_queries_queue,
query,
"UpdateKind::CallbackQuer",
),
UpdateKind::ShippingQuery(query) => send(
&self.requester,
&self.shipping_queries_queue,
query,
"UpdateKind::ShippingQuery",
),
UpdateKind::PreCheckoutQuery(query) => send(
&self.requester,
&self.pre_checkout_queries_queue,
query,
"UpdateKind::PreCheckoutQuery",
),
UpdateKind::Poll(poll) => {
send(&self.requester, &self.polls_queue, poll, "UpdateKind::Poll")
}
UpdateKind::PollAnswer(answer) => send(
&self.requester,
&self.poll_answers_queue,
answer,
"UpdateKind::PollAnswer",
),
UpdateKind::MyChatMember(chat_member_updated) => send(
&self.requester,
&self.my_chat_members_queue,
chat_member_updated,
"UpdateKind::MyChatMember",
),
UpdateKind::ChatMember(chat_member_updated) => send(
&self.requester,
&self.chat_members_queue,
chat_member_updated,
"UpdateKind::MyChatMember",
),
UpdateKind::ChatJoinRequest(chat_join_request) => send(
&self.requester,
&self.chat_join_requests_queue,
chat_join_request,
"UpdateKind::ChatJoinRequest",
),
UpdateKind::Error(err) => {
log::error!(
"Cannot parse an update.\nError: {:?}\n\
This is a bug in teloxide-core, please open an issue here: \
https://github.com/teloxide/teloxide/issues.",
err,
);
}
} }
} })
} .await
.unwrap();
fn hint_allowed_updates<E>(&self, listener: &mut impl UpdateListener<E>) {
fn hint_handler_allowed_update<T>(
queue: &Option<T>,
kind: AllowedUpdate,
) -> std::option::IntoIter<AllowedUpdate> {
queue.as_ref().map(|_| kind).into_iter()
}
let mut allowed = hint_handler_allowed_update(&self.messages_queue, AllowedUpdate::Message)
.chain(hint_handler_allowed_update(
&self.edited_messages_queue,
AllowedUpdate::EditedMessage,
))
.chain(hint_handler_allowed_update(
&self.channel_posts_queue,
AllowedUpdate::ChannelPost,
))
.chain(hint_handler_allowed_update(
&self.edited_channel_posts_queue,
AllowedUpdate::EditedChannelPost,
))
.chain(hint_handler_allowed_update(
&self.inline_queries_queue,
AllowedUpdate::InlineQuery,
))
.chain(hint_handler_allowed_update(
&self.chosen_inline_results_queue,
AllowedUpdate::ChosenInlineResult,
))
.chain(hint_handler_allowed_update(
&self.callback_queries_queue,
AllowedUpdate::CallbackQuery,
))
.chain(hint_handler_allowed_update(
&self.shipping_queries_queue,
AllowedUpdate::ShippingQuery,
))
.chain(hint_handler_allowed_update(
&self.pre_checkout_queries_queue,
AllowedUpdate::PreCheckoutQuery,
))
.chain(hint_handler_allowed_update(&self.polls_queue, AllowedUpdate::Poll))
.chain(hint_handler_allowed_update(&self.poll_answers_queue, AllowedUpdate::PollAnswer))
.chain(hint_handler_allowed_update(
&self.my_chat_members_queue,
AllowedUpdate::MyChatMember,
))
.chain(hint_handler_allowed_update(
&self.chat_members_queue,
AllowedUpdate::ChatMember,
));
listener.hint_allowed_updates(&mut allowed);
}
async fn wait_for_handlers(&mut self) {
log::debug!("Waiting for handlers to finish");
// Drop all senders, so handlers can stop
self.messages_queue.take();
self.edited_messages_queue.take();
self.channel_posts_queue.take();
self.edited_channel_posts_queue.take();
self.inline_queries_queue.take();
self.chosen_inline_results_queue.take();
self.callback_queries_queue.take();
self.shipping_queries_queue.take();
self.pre_checkout_queries_queue.take();
self.polls_queue.take();
self.poll_answers_queue.take();
self.my_chat_members_queue.take();
self.chat_members_queue.take();
// Wait untill all handlers finish
self.running_handlers.by_ref().for_each(|_| async {}).await;
}
}
fn send<'a, R, Upd>(requester: &'a R, tx: &'a Tx<R, Upd>, update: Upd, variant: &'static str)
where
Upd: Debug,
R: Requester + Clone,
{
if let Some(tx) = tx {
if let Err(error) = tx.send(UpdateWithCx { requester: requester.clone(), update }) {
log::error!(
"The RX part of the {} channel is closed, but an update is received.\nError:{}\n",
variant,
error
);
}
} }
} }

View file

@ -1,31 +0,0 @@
use std::future::Future;
use crate::dispatching::{DispatcherHandlerRx, UpdateWithCx};
use futures::future::BoxFuture;
/// An asynchronous handler of a stream of updates used in [`Dispatcher`].
///
/// See the [module-level documentation](crate::dispatching) for the design
/// overview.
///
/// [`Dispatcher`]: crate::dispatching::Dispatcher
#[deprecated(note = "Use dispatching2 instead")]
pub trait DispatcherHandler<R, Upd> {
#[must_use]
fn handle(self, updates: DispatcherHandlerRx<R, Upd>) -> BoxFuture<'static, ()>
where
UpdateWithCx<R, Upd>: Send + 'static;
}
impl<R, Upd, F, Fut> DispatcherHandler<R, Upd> for F
where
F: FnOnce(DispatcherHandlerRx<R, Upd>) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
fn handle(self, updates: DispatcherHandlerRx<R, Upd>) -> BoxFuture<'static, ()>
where
UpdateWithCx<R, Upd>: Send + 'static,
{
Box::pin(async move { self(updates).await })
}
}

View file

@ -1,62 +0,0 @@
use crate::{dispatching::UpdateWithCx, utils::command::BotCommand};
use futures::{stream::BoxStream, Stream, StreamExt};
use teloxide_core::types::Message;
/// An extension trait to be used with [`DispatcherHandlerRx`].
///
/// See the [module-level documentation](crate::dispatching) for the design
/// overview.
///
/// [`DispatcherHandlerRx`]: crate::dispatching::DispatcherHandlerRx
#[deprecated(note = "Use dispatching2 instead")]
pub trait DispatcherHandlerRxExt<R> {
/// Extracts only text messages from this stream of arbitrary messages.
fn text_messages(self) -> BoxStream<'static, (UpdateWithCx<R, Message>, String)>
where
Self: Stream<Item = UpdateWithCx<R, Message>>,
R: Send + 'static;
/// Extracts only commands with their arguments from this stream of
/// arbitrary messages.
fn commands<C, N>(self, bot_name: N) -> BoxStream<'static, (UpdateWithCx<R, Message>, C)>
where
Self: Stream<Item = UpdateWithCx<R, Message>>,
C: BotCommand,
N: Into<String> + Send,
R: Send + 'static;
}
impl<R, T> DispatcherHandlerRxExt<R> for T
where
T: Send + 'static,
{
fn text_messages(self) -> BoxStream<'static, (UpdateWithCx<R, Message>, String)>
where
Self: Stream<Item = UpdateWithCx<R, Message>>,
R: Send + 'static,
{
self.filter_map(|cx| async move {
let text = cx.update.text().map(ToOwned::to_owned);
text.map(move |text| (cx, text))
})
.boxed()
}
fn commands<C, N>(self, bot_name: N) -> BoxStream<'static, (UpdateWithCx<R, Message>, C)>
where
Self: Stream<Item = UpdateWithCx<R, Message>>,
C: BotCommand,
N: Into<String> + Send,
R: Send + 'static,
{
let bot_name = bot_name.into();
self.text_messages()
.filter_map(move |(cx, text)| {
let bot_name = bot_name.clone();
async move { C::parse(&text, &bot_name).map(|command| (cx, command)).ok() }
})
.boxed()
}
}

View file

@ -1,7 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use crate::{ use crate::{
dispatching2::{ dispatching::{
dialogue::{Dialogue, GetChatId, Storage}, dialogue::{Dialogue, GetChatId, Storage},
HandlerFactory, HandlerFactory,
}, },

View file

@ -1,74 +1,113 @@
//! Old updates dispatching (**DEPRECATED**: use [`crate::dispatching2`] //! An update dispatching model based on [`dptree`].
//! instead).
//! //!
//! The key type here is [`Dispatcher`]. It encapsulates [`Bot`] and handlers //! In teloxide, updates are dispatched by a pipleine. The central type is
//! for [all the update kinds]. //! [`dptree::Handler`] -- it represents a handler of an update; since the API
//! is highly declarative, you can combine handlers with each other via such
//! methods as [`dptree::Handler::chain`] and [`dptree::Handler::branch`]. The
//! former method pipes one handler to another one, whilst the latter creates a
//! new node, as communicated by the name. For more information, please refer to
//! the documentation of [`dptree`].
//! //!
//! Every handler accept [`tokio::sync::mpsc::UnboundedReceiver`] (the RX halve //! The pattern itself is called [chain of responsibility], a well-known design
//! of an asynchronous channel). Inside a body of your handler, you typically //! technique across OOP developers. But unlike typical object-oriented design,
//! asynchronously concurrently iterate through updates like this: //! we employ declarative FP-style functions like [`dptree::filter`],
//! [`dptree::filter_map`], and [`dptree::endpoint`]; these functions create
//! special forms of [`dptree::Handler`]; for more information, please refer to
//! their respective documentation. Each of these higher-order functions accept
//! a closure that is made into a handler -- this closure can take any
//! additional parameters, which must be supplied while creating [`Dispatcher`]
//! (see [`DispatcherBuilder::dependencies`]).
//! //!
//! ``` //! The [`Dispatcher`] type puts all these things together: it only provides
//! [`Dispatcher::dispatch`] and a handful of other methods. Once you call
//! `.dispatch()`, it will retrieve updates from the Telegram server and pass
//! them to your handler, which is a parameter of [`Dispatcher::builder`].
//!
//! Let us look at a simple example:
//!
//!
//! ([Full](https://github.com/teloxide/teloxide/blob/master/examples/shared_state.rs))
//!
//! ```no_run
//! use std::sync::atomic::{AtomicU64, Ordering};
//!
//! use once_cell::sync::Lazy;
//! use teloxide::prelude::*; //! use teloxide::prelude::*;
//! use tokio_stream::wrappers::UnboundedReceiverStream;
//! //!
//! async fn handle_messages(rx: DispatcherHandlerRx<AutoSend<Bot>, Message>) { //! static MESSAGES_TOTAL: Lazy<AtomicU64> = Lazy::new(AtomicU64::default);
//! UnboundedReceiverStream::new(rx) //!
//! .for_each_concurrent(None, |message| async move { //! # #[tokio::main]
//! dbg!(message.update); //! # async fn main() {
//! }) //! pretty_env_logger::init();
//! .await; //! log::info!("Starting shared_state_bot...");
//! } //!
//! let bot = Bot::from_env().auto_send();
//!
//! let handler = Update::filter_message().branch(dptree::endpoint(
//! |msg: Message, bot: AutoSend<Bot>| async move {
//! let previous = MESSAGES_TOTAL.fetch_add(1, Ordering::Relaxed);
//! bot.send_message(msg.chat.id, format!("I received {} messages in total.", previous))
//! .await?;
//! respond(())
//! },
//! ));
//!
//! Dispatcher::builder(bot, handler).build().setup_ctrlc_handler().dispatch().await;
//! # }
//! ``` //! ```
//! //!
//! When [`Update`] is received from Telegram, [`Dispatcher`] pushes it into an //! 1. First, we create the bot: `let bot = Bot::from_env().auto_send()`.
//! appropriate handler, depending on its kind. That's simple! //! 2. Then we construct an update handler. While it is possible to handle all
//! kinds of [`crate::types::Update`], here we are only interested in
//! [`crate::types::Message`]: [`UpdateFilterExt::filter_message`] create a
//! handler object which filters all messages out of a generic update.
//! 3. By doing `.branch(dptree::endpoint(...))`, we set up a custom handling
//! closure that receives `msg: Message` and `bot: AutoSend<Bot>`. There are
//! called dependencies: `msg` is supplied by
//! [`UpdateFilterExt::filter_message`], while `bot` is supplied by
//! [`Dispatcher`].
//! //!
//! **Note** that handlers must implement [`DispatcherHandler`], which means //! That being said, if we receive a message, the dispatcher will call our
//! that: //! handler, but if we receive something other than a message (e.g., a channel
//! - You are able to supply [`DialogueDispatcher`] as a handler. //! post), you will see an unhandled update notice in your terminal.
//! - You are able to supply functions that accept
//! [`tokio::sync::mpsc::UnboundedReceiver`] and return `Future<Output = ()>`
//! as a handler.
//! //!
//! Since they implement [`DispatcherHandler`] too. //! This is a very limited example of update pipelining facilities. In more
//! involved scenarios, there are multiple branches and chains; if one element
//! of a chain fails to handle an update, the update will be passed forwards; if
//! no handler succeeds at handling the update, [`Dispatcher`] will invoke a
//! default handler set up via [`DispatcherBuilder::default_handler`].
//! //!
//! [See the examples](https://github.com/teloxide/teloxide/tree/master/examples). //! Update pipelining provides several advantages over the typical `match
//! (update.kind) { ... }` approach:
//! //!
//! [`Dispatcher`]: crate::dispatching::Dispatcher //! 1. It supports _extension_: e.g., you
//! [all the update kinds]: crate::types::UpdateKind //! can define extension filters or some other handlers and then combine them in
//! [`Update`]: crate::types::Update //! a single place, thus facilitating loose coupling.
//! [`ErrorHandler`]: crate::dispatching::ErrorHandler //! 2. Pipelining exhibits a natural syntax for expressing message processing.
//! [`DispatcherHandler`]: crate::dispatching::DispatcherHandler //! 3. Lastly, it provides a primitive form of [dependency injection (DI)],
//! [`DialogueDispatcher`]: crate::dispatching::dialogue::DialogueDispatcher //! which allows you to deal with such objects as a bot and various update types
//! [`DispatcherHandlerResult`]: crate::dispatching::DispatcherHandlerResult //! easily.
//! [`Bot`]: crate::Bot //!
//! [`tokio::sync::mpsc::UnboundedReceiver`]: https://docs.rs/tokio/0.2.11/tokio/sync/mpsc/struct.UnboundedReceiver.html //! For a more involved example, see [`examples/dispatching_features.rs`](https://github.com/teloxide/teloxide/blob/master/examples/dispatching_features.rs).
//! [examples/dialogue_bot]: https://github.com/teloxide/teloxide/tree/master/examples/dialogue_bot //!
//! TODO: explain a more involved example with multiple branches.
//!
//! [chain of responsibility]: https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern
//! [dependency injection (DI)]: https://en.wikipedia.org/wiki/Dependency_injection
#![allow(deprecated)] #[cfg(all(feature = "ctrlc_handler"))]
pub mod repls;
pub mod dialogue; pub mod dialogue;
mod dispatcher;
mod filter_ext;
mod handler_ext;
mod handler_factory;
pub mod stop_token; pub mod stop_token;
pub mod update_listeners; pub mod update_listeners;
#[cfg(feature = "ctrlc_handler")]
pub(crate) mod repls;
mod dispatcher;
mod dispatcher_handler;
mod dispatcher_handler_rx_ext;
mod update_with_cx;
pub use crate::utils::shutdown_token::{IdleShutdownError, ShutdownToken}; pub use crate::utils::shutdown_token::{IdleShutdownError, ShutdownToken};
pub use dispatcher::Dispatcher; pub use dispatcher::{Dispatcher, DispatcherBuilder, UpdateHandler};
pub use dispatcher_handler::DispatcherHandler; pub use filter_ext::{MessageFilterExt, UpdateFilterExt};
pub use dispatcher_handler_rx_ext::DispatcherHandlerRxExt; pub use handler_ext::HandlerExt;
use tokio::sync::mpsc::UnboundedReceiver; pub use handler_factory::HandlerFactory;
pub use update_with_cx::{UpdateWithCx, UpdateWithCxRequesterType};
/// A type of a stream, consumed by [`Dispatcher`]'s handlers.
///
/// [`Dispatcher`]: crate::dispatching::Dispatcher
#[deprecated(note = "Use dispatching2 instead")]
pub type DispatcherHandlerRx<R, Upd> = UnboundedReceiver<UpdateWithCx<R, Upd>>;

View file

@ -1,46 +1,46 @@
use crate::{ use crate::{
dispatching::{ dispatching::{
update_listeners, update_listeners::UpdateListener, Dispatcher, DispatcherHandlerRx, update_listeners, update_listeners::UpdateListener, HandlerExt, UpdateFilterExt,
DispatcherHandlerRxExt, UpdateWithCx,
}, },
error_handlers::{LoggingErrorHandler, OnError}, error_handlers::LoggingErrorHandler,
types::Update,
utils::command::BotCommand, utils::command::BotCommand,
}; };
use futures::StreamExt; use dptree::di::{DependencyMap, Injectable};
use std::{fmt::Debug, future::Future, sync::Arc}; use std::{fmt::Debug, marker::PhantomData};
use teloxide_core::{requests::Requester, types::Message}; use teloxide_core::requests::Requester;
use tokio_stream::wrappers::UnboundedReceiverStream;
/// A [REPL] for commands. /// A [REPL] for commands.
/// ///
/// All errors from an update listener and handler will be logged. /// All errors from an update listener and handler will be logged.
/// ///
/// # Caution /// ## Caution
/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, /// **DO NOT** use this function together with [`Dispatcher`] and other REPLs,
/// because Telegram disallow multiple requests at the same time from the same /// because Telegram disallow multiple requests at the same time from the same
/// bot. /// bot.
/// ///
/// ## Dependency requirements
///
/// - Those of [`HandlerExt::filter_command`].
///
/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop /// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop
/// [`Dispatcher`]: crate::dispatching::Dispatcher /// [`Dispatcher`]: crate::dispatching::Dispatcher
#[cfg(feature = "ctrlc_handler")] #[cfg(feature = "ctrlc_handler")]
pub async fn commands_repl<R, Cmd, H, Fut, HandlerE, N>(requester: R, bot_name: N, handler: H) pub async fn commands_repl<'a, R, Cmd, H, E, Args>(bot: R, handler: H, cmd: PhantomData<Cmd>)
where where
Cmd: BotCommand + Send + 'static, Cmd: BotCommand + Send + Sync + 'static,
H: Fn(UpdateWithCx<R, Message>, Cmd) -> Fut + Send + Sync + 'static, H: Injectable<DependencyMap, Result<(), E>, Args> + Send + Sync + 'static,
Fut: Future<Output = Result<(), HandlerE>> + Send + 'static, R: Requester + Clone + Send + Sync + 'static,
Result<(), HandlerE>: OnError<HandlerE>,
HandlerE: Debug + Send,
N: Into<String> + Send + 'static,
R: Requester + Send + Clone + 'static,
<R as Requester>::GetUpdates: Send, <R as Requester>::GetUpdates: Send,
E: Debug + Send + Sync + 'static,
{ {
let cloned_requester = requester.clone(); let cloned_bot = bot.clone();
commands_repl_with_listener( commands_repl_with_listener(
requester, bot,
bot_name,
handler, handler,
update_listeners::polling_default(cloned_requester).await, update_listeners::polling_default(cloned_bot).await,
cmd,
) )
.await; .await;
} }
@ -49,50 +49,55 @@ where
/// ///
/// All errors from an update listener and handler will be logged. /// All errors from an update listener and handler will be logged.
/// ///
/// # Caution /// ## Caution
/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, /// **DO NOT** use this function together with [`Dispatcher`] and other REPLs,
/// because Telegram disallow multiple requests at the same time from the same /// because Telegram disallow multiple requests at the same time from the same
/// bot. /// bot.
/// ///
/// ## Dependency requirements
///
/// - Those of [`HandlerExt::filter_command`].
///
/// [`Dispatcher`]: crate::dispatching::Dispatcher /// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`commands_repl`]: crate::dispatching::repls::commands_repl() /// [`commands_repl`]: crate::dispatching::repls::commands_repl()
/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener /// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener
#[cfg(feature = "ctrlc_handler")] #[cfg(feature = "ctrlc_handler")]
pub async fn commands_repl_with_listener<'a, R, Cmd, H, Fut, L, ListenerE, HandlerE, N>( pub async fn commands_repl_with_listener<'a, R, Cmd, H, L, ListenerE, E, Args>(
requester: R, bot: R,
bot_name: N,
handler: H, handler: H,
listener: L, listener: L,
_cmd: PhantomData<Cmd>,
) where ) where
Cmd: BotCommand + Send + 'static, Cmd: BotCommand + Send + Sync + 'static,
H: Fn(UpdateWithCx<R, Message>, Cmd) -> Fut + Send + Sync + 'static, H: Injectable<DependencyMap, Result<(), E>, Args> + Send + Sync + 'static,
Fut: Future<Output = Result<(), HandlerE>> + Send + 'static,
L: UpdateListener<ListenerE> + Send + 'a, L: UpdateListener<ListenerE> + Send + 'a,
ListenerE: Debug + Send + 'a, ListenerE: Debug + Send + 'a,
Result<(), HandlerE>: OnError<HandlerE>, R: Requester + Clone + Send + Sync + 'static,
HandlerE: Debug + Send, E: Debug + Send + Sync + 'static,
N: Into<String> + Send + 'static,
R: Requester + Clone + Send + 'static,
{ {
let handler = Arc::new(handler); use crate::dispatching::Dispatcher;
Dispatcher::<R>::new(requester) // Other update types are of no interest to use since this REPL is only for
.messages_handler(move |rx: DispatcherHandlerRx<R, Message>| { // commands. See <https://github.com/teloxide/teloxide/issues/557>.
UnboundedReceiverStream::new(rx).commands::<Cmd, N>(bot_name).for_each_concurrent( let ignore_update = |_upd| Box::pin(async {});
None,
move |(cx, cmd)| {
let handler = Arc::clone(&handler);
async move { let mut dispatcher = Dispatcher::builder(
handler(cx, cmd).await.log_on_error().await; bot,
} Update::filter_message().filter_command::<Cmd>().branch(dptree::endpoint(handler)),
}, )
) .default_handler(ignore_update)
}) .build();
.setup_ctrlc_handler()
#[cfg(feature = "ctrlc_handler")]
dispatcher.setup_ctrlc_handler();
// To make mutable var from immutable.
let mut dispatcher = dispatcher;
dispatcher
.dispatch_with_listener( .dispatch_with_listener(
listener, listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"), LoggingErrorHandler::with_custom_text("An error from the update listener"),
) )
.await .await;
} }

View file

@ -30,7 +30,7 @@ where
D: Clone + Default + Send + 'static, D: Clone + Default + Send + 'static,
Fut: Future<Output = DialogueStage<D>> + Send + 'static, Fut: Future<Output = DialogueStage<D>> + Send + 'static,
R: Requester + Send + Clone + 'static, R: Requester + Send + Clone + 'static,
<R as Requester>::GetUpdates: Send, <R as Requester>::GetUpdatesFaultTolerant: Send,
{ {
let cloned_requester = requester.clone(); let cloned_requester = requester.clone();

View file

@ -1,7 +1,8 @@
//! REPLs for dispatching updates.
mod commands_repl; mod commands_repl;
mod dialogues_repl;
mod repl; mod repl;
pub use commands_repl::{commands_repl, commands_repl_with_listener}; pub use commands_repl::{commands_repl, commands_repl_with_listener};
pub use dialogues_repl::{dialogues_repl, dialogues_repl_with_listener}; //pub use dialogues_repl::{dialogues_repl, dialogues_repl_with_listener};
pub use repl::{repl, repl_with_listener}; pub use repl::{repl, repl_with_listener};

View file

@ -1,14 +1,11 @@
use crate::{ use crate::{
dispatching::{ dispatching::{update_listeners, update_listeners::UpdateListener, UpdateFilterExt},
update_listeners, update_listeners::UpdateListener, Dispatcher, DispatcherHandlerRx,
UpdateWithCx,
},
error_handlers::{LoggingErrorHandler, OnError}, error_handlers::{LoggingErrorHandler, OnError},
types::Update,
}; };
use futures::StreamExt; use dptree::di::{DependencyMap, Injectable};
use std::{fmt::Debug, future::Future, sync::Arc}; use std::fmt::Debug;
use teloxide_core::{requests::Requester, types::Message}; use teloxide_core::requests::Requester;
use tokio_stream::wrappers::UnboundedReceiverStream;
/// A [REPL] for messages. /// A [REPL] for messages.
/// ///
@ -22,22 +19,16 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop /// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop
/// [`Dispatcher`]: crate::dispatching::Dispatcher /// [`Dispatcher`]: crate::dispatching::Dispatcher
#[cfg(feature = "ctrlc_handler")] #[cfg(feature = "ctrlc_handler")]
pub async fn repl<R, H, Fut, E>(requester: R, handler: H) pub async fn repl<R, H, E, Args>(bot: R, handler: H)
where where
H: Fn(UpdateWithCx<R, Message>) -> Fut + Send + Sync + 'static, H: Injectable<DependencyMap, Result<(), E>, Args> + Send + Sync + 'static,
Fut: Future<Output = Result<(), E>> + Send + 'static,
Result<(), E>: OnError<E>, Result<(), E>: OnError<E>,
E: Debug + Send, E: Debug + Send + Sync + 'static,
R: Requester + Send + Clone + 'static, R: Requester + Send + Sync + Clone + 'static,
<R as Requester>::GetUpdates: Send, <R as Requester>::GetUpdates: Send,
{ {
let cloned_requester = requester.clone(); let cloned_bot = bot.clone();
repl_with_listener( repl_with_listener(bot, handler, update_listeners::polling_default(cloned_bot).await).await;
requester,
handler,
update_listeners::polling_default(cloned_requester).await,
)
.await;
} }
/// Like [`repl`], but with a custom [`UpdateListener`]. /// Like [`repl`], but with a custom [`UpdateListener`].
@ -53,44 +44,34 @@ where
/// [`repl`]: crate::dispatching::repls::repl() /// [`repl`]: crate::dispatching::repls::repl()
/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener /// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener
#[cfg(feature = "ctrlc_handler")] #[cfg(feature = "ctrlc_handler")]
pub async fn repl_with_listener<'a, R, H, Fut, E, L, ListenerE>( pub async fn repl_with_listener<'a, R, H, E, L, ListenerE, Args>(bot: R, handler: H, listener: L)
requester: R, where
handler: H, H: Injectable<DependencyMap, Result<(), E>, Args> + Send + Sync + 'static,
listener: L,
) where
H: Fn(UpdateWithCx<R, Message>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), E>> + Send + 'static,
L: UpdateListener<ListenerE> + Send + 'a, L: UpdateListener<ListenerE> + Send + 'a,
ListenerE: Debug, ListenerE: Debug,
Result<(), E>: OnError<E>, Result<(), E>: OnError<E>,
E: Debug + Send, E: Debug + Send + Sync + 'static,
R: Requester + Clone + Send + 'static, R: Requester + Clone + Send + Sync + 'static,
{ {
let handler = Arc::new(handler); use crate::dispatching::Dispatcher;
Dispatcher::new(requester) // Other update types are of no interest to use since this REPL is only for
.messages_handler(|rx: DispatcherHandlerRx<R, Message>| { // messages. See <https://github.com/teloxide/teloxide/issues/557>.
UnboundedReceiverStream::new(rx).for_each_concurrent(None, move |message| { let ignore_update = |_upd| Box::pin(async {});
let handler = Arc::clone(&handler);
async move { #[allow(unused_mut)]
handler(message).await.log_on_error().await; let mut dispatcher =
} Dispatcher::builder(bot, Update::filter_message().branch(dptree::endpoint(handler)))
}) .default_handler(ignore_update)
}) .build();
.setup_ctrlc_handler()
#[cfg(feature = "ctrlc_handler")]
dispatcher.setup_ctrlc_handler();
dispatcher
.dispatch_with_listener( .dispatch_with_listener(
listener, listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"), LoggingErrorHandler::with_custom_text("An error from the update listener"),
) )
.await; .await;
} }
#[test]
fn repl_is_send() {
let bot = crate::Bot::new("");
let repl = crate::repl(bot, |_| async { crate::respond(()) });
assert_send(&repl);
fn assert_send(_: &impl Send) {}
}

View file

@ -1,248 +0,0 @@
use crate::dispatching::dialogue::GetChatId;
use teloxide_core::{
payloads::{
SendAnimationSetters, SendAudioSetters, SendContactSetters, SendDocumentSetters,
SendLocationSetters, SendMediaGroupSetters, SendMessageSetters, SendPhotoSetters,
SendStickerSetters, SendVenueSetters, SendVideoNoteSetters, SendVideoSetters,
SendVoiceSetters,
},
requests::{Request, Requester},
types::{ChatId, InputFile, InputMedia, Message},
};
/// A [`Dispatcher`]'s handler's context of a bot and an update.
///
/// See the [module-level documentation](crate::dispatching) for the design
/// overview.
///
/// [`Dispatcher`]: crate::dispatching::Dispatcher
#[derive(Debug)]
#[deprecated(note = "Use dispatching2 instead")]
pub struct UpdateWithCx<R, Upd> {
pub requester: R,
pub update: Upd,
}
impl<Upd, R> GetChatId for UpdateWithCx<R, Upd>
where
Upd: GetChatId,
{
fn chat_id(&self) -> i64 {
self.update.chat_id()
}
}
#[doc(hidden)]
// Now it is used only inside `#[teloxide(subtransition)]` for type inference.
pub trait UpdateWithCxRequesterType {
type Requester;
}
impl<R, Upd> UpdateWithCxRequesterType for UpdateWithCx<R, Upd> {
type Requester = R;
}
impl<R> UpdateWithCx<R, Message>
where
R: Requester,
{
/// A shortcut for `.answer(text).send().await`.
#[deprecated(note = "Use .answer(text).await instead")]
pub async fn answer_str<T>(&self, text: T) -> Result<Message, R::Err>
where
T: Into<String>,
R::SendMessage: std::future::Future,
{
self.answer(text).send().await
}
pub fn answer<T>(&self, text: T) -> R::SendMessage
where
T: Into<String>,
{
self.requester.send_message(self.chat_id(), text)
}
pub fn reply_to<T>(&self, text: T) -> R::SendMessage
where
T: Into<String>,
{
self.requester.send_message(self.chat_id(), text).reply_to_message_id(self.update.id)
}
pub fn reply_audio(&self, audio: InputFile) -> R::SendAudio {
self.requester.send_audio(self.update.chat.id, audio).reply_to_message_id(self.update.id)
}
pub fn reply_animation(&self, animation: InputFile) -> R::SendAnimation {
self.requester
.send_animation(self.update.chat.id, animation)
.reply_to_message_id(self.update.id)
}
pub fn reply_document(&self, document: InputFile) -> R::SendDocument {
self.requester
.send_document(self.update.chat.id, document)
.reply_to_message_id(self.update.id)
}
pub fn reply_photo(&self, photo: InputFile) -> R::SendPhoto {
self.requester.send_photo(self.update.chat.id, photo).reply_to_message_id(self.update.id)
}
pub fn reply_video(&self, video: InputFile) -> R::SendVideo {
self.requester.send_video(self.update.chat.id, video).reply_to_message_id(self.update.id)
}
pub fn reply_voice(&self, voice: InputFile) -> R::SendVoice {
self.requester.send_voice(self.update.chat.id, voice).reply_to_message_id(self.update.id)
}
pub fn reply_media_group<T>(&self, media_group: T) -> R::SendMediaGroup
where
T: IntoIterator<Item = InputMedia>,
{
self.requester
.send_media_group(self.update.chat.id, media_group)
.reply_to_message_id(self.update.id)
}
pub fn reply_location(&self, latitude: f64, longitude: f64) -> R::SendLocation {
self.requester
.send_location(self.update.chat.id, latitude, longitude)
.reply_to_message_id(self.update.id)
}
pub fn reply_venue<T, U>(
&self,
latitude: f64,
longitude: f64,
title: T,
address: U,
) -> R::SendVenue
where
T: Into<String>,
U: Into<String>,
{
self.requester
.send_venue(self.update.chat.id, latitude, longitude, title, address)
.reply_to_message_id(self.update.id)
}
pub fn reply_video_note(&self, video_note: InputFile) -> R::SendVideoNote {
self.requester
.send_video_note(self.update.chat.id, video_note)
.reply_to_message_id(self.update.id)
}
pub fn reply_contact<T, U>(&self, phone_number: T, first_name: U) -> R::SendContact
where
T: Into<String>,
U: Into<String>,
{
self.requester
.send_contact(self.update.chat.id, phone_number, first_name)
.reply_to_message_id(self.update.id)
}
pub fn reply_sticker(&self, sticker: InputFile) -> R::SendSticker {
self.requester
.send_sticker(self.update.chat.id, sticker)
.reply_to_message_id(self.update.id)
}
pub fn answer_photo(&self, photo: InputFile) -> R::SendPhoto {
self.requester.send_photo(self.update.chat.id, photo)
}
pub fn answer_audio(&self, audio: InputFile) -> R::SendAudio {
self.requester.send_audio(self.update.chat.id, audio)
}
pub fn answer_animation(&self, animation: InputFile) -> R::SendAnimation {
self.requester.send_animation(self.update.chat.id, animation)
}
pub fn answer_document(&self, document: InputFile) -> R::SendDocument {
self.requester.send_document(self.update.chat.id, document)
}
pub fn answer_video(&self, video: InputFile) -> R::SendVideo {
self.requester.send_video(self.update.chat.id, video)
}
pub fn answer_voice(&self, voice: InputFile) -> R::SendVoice {
self.requester.send_voice(self.update.chat.id, voice)
}
pub fn answer_media_group<T>(&self, media_group: T) -> R::SendMediaGroup
where
T: IntoIterator<Item = InputMedia>,
{
self.requester.send_media_group(self.update.chat.id, media_group)
}
pub fn answer_location(&self, latitude: f64, longitude: f64) -> R::SendLocation {
self.requester.send_location(self.update.chat.id, latitude, longitude)
}
pub fn answer_venue<T, U>(
&self,
latitude: f64,
longitude: f64,
title: T,
address: U,
) -> R::SendVenue
where
T: Into<String>,
U: Into<String>,
{
self.requester.send_venue(self.update.chat.id, latitude, longitude, title, address)
}
pub fn answer_video_note(&self, video_note: InputFile) -> R::SendVideoNote {
self.requester.send_video_note(self.update.chat.id, video_note)
}
pub fn answer_contact<T, U>(&self, phone_number: T, first_name: U) -> R::SendContact
where
T: Into<String>,
U: Into<String>,
{
self.requester.send_contact(self.chat_id(), phone_number, first_name)
}
pub fn answer_sticker(&self, sticker: InputFile) -> R::SendSticker {
self.requester.send_sticker(self.update.chat.id, sticker)
}
pub fn forward_to<T>(&self, chat_id: T) -> R::ForwardMessage
where
T: Into<ChatId>,
{
self.requester.forward_message(chat_id, self.update.chat.id, self.update.id)
}
pub fn edit_message_text<T>(&self, text: T) -> R::EditMessageText
where
T: Into<String>,
{
self.requester.edit_message_text(self.update.chat.id, self.update.id, text)
}
pub fn edit_message_caption(&self) -> R::EditMessageCaption {
self.requester.edit_message_caption(self.update.chat.id, self.update.id)
}
pub fn delete_message(&self) -> R::DeleteMessage {
self.requester.delete_message(self.update.chat.id, self.update.id)
}
pub fn pin_message(&self) -> R::PinChatMessage {
self.requester.pin_chat_message(self.update.chat.id, self.update.id)
}
pub fn answer_dice(&self) -> R::SendDice {
self.requester.send_dice(self.update.chat.id)
}
}

View file

@ -1,20 +0,0 @@
use crate::types::CallbackQuery;
use teloxide_core::types::Message;
/// Something that may has a chat ID.
pub trait GetChatId {
#[must_use]
fn chat_id(&self) -> Option<i64>;
}
impl GetChatId for Message {
fn chat_id(&self) -> Option<i64> {
Some(self.chat.id)
}
}
impl GetChatId for CallbackQuery {
fn chat_id(&self) -> Option<i64> {
self.message.as_ref().map(|mes| mes.chat.id)
}
}

View file

@ -1,182 +0,0 @@
//! Support for user dialogues.
//!
//! The main type is (surprise!) [`Dialogue`]. Under the hood, it is just a
//! wrapper over [`Storage`] and a chat ID. All it does is provides convenient
//! method for manipulating the dialogue state. [`Storage`] is where all
//! dialogue states are stored; it can be either [`InMemStorage`], which is a
//! simple hash map, or database wrappers such as [`SqliteStorage`]. In the
//! latter case, your dialogues are _persistent_, meaning that you can safely
//! restart your bot and all dialogues will remain in the database -- this is a
//! preferred method for production bots.
//!
//! [`examples/dialogue.rs`] clearly demonstrates the typical usage of
//! dialogues. Your dialogue state can be represented as an enumeration:
//!
//! ```ignore
//! #[derive(DialogueState, Clone)]
//! #[handler_out(anyhow::Result<()>)]
//! pub enum State {
//! #[handler(handle_start)]
//! Start,
//!
//! #[handler(handle_receive_full_name)]
//! ReceiveFullName,
//!
//! #[handler(handle_receive_age)]
//! ReceiveAge { full_name: String },
//!
//! #[handler(handle_receive_location)]
//! ReceiveLocation { full_name: String, age: u8 },
//! }
//! ```
//!
//! Each state is associated with its respective handler: e.g., when a dialogue
//! state is `ReceiveAge`, `handle_receive_age` is invoked:
//!
//! ```ignore
//! async fn handle_receive_age(
//! bot: AutoSend<Bot>,
//! msg: Message,
//! dialogue: MyDialogue,
//! (full_name,): (String,), // Available from `State::ReceiveAge`.
//! ) -> anyhow::Result<()> {
//! match msg.text().map(|text| text.parse::<u8>()) {
//! Some(Ok(age)) => {
//! bot.send_message(msg.chat.id, "What's your location?").await?;
//! dialogue.update(State::ReceiveLocation { full_name, age }).await?;
//! }
//! _ => {
//! bot.send_message(msg.chat.id, "Send me a number.").await?;
//! }
//! }
//!
//! Ok(())
//! }
//! ```
//!
//! Variant's fields are passed to state handlers as tuples: `(full_name,):
//! (String,)`. Using [`Dialogue::update`], you can update the dialogue with a
//! new state, in our case -- `State::ReceiveLocation { full_name, age }`. To
//! exit the dialogue, just call [`Dialogue::exit`] and it will be removed from
//! the inner storage:
//!
//! ```ignore
//! async fn handle_receive_location(
//! bot: AutoSend<Bot>,
//! msg: Message,
//! dialogue: MyDialogue,
//! (full_name, age): (String, u8), // Available from `State::ReceiveLocation`.
//! ) -> anyhow::Result<()> {
//! match msg.text() {
//! Some(location) => {
//! let message =
//! format!("Full name: {}\nAge: {}\nLocation: {}", full_name, age, location);
//! bot.send_message(msg.chat.id, message).await?;
//! dialogue.exit().await?;
//! }
//! None => {
//! bot.send_message(msg.chat.id, "Send me a text message.").await?;
//! }
//! }
//!
//! Ok(())
//! }
//! ```
//!
//! [`examples/dialogue.rs`]: https://github.com/teloxide/teloxide/blob/master/examples/dialogue.rs
#[cfg(feature = "redis-storage")]
pub use crate::dispatching::dialogue::{RedisStorage, RedisStorageError};
#[cfg(feature = "sqlite-storage")]
pub use crate::dispatching::dialogue::{SqliteStorage, SqliteStorageError};
pub use crate::dispatching::dialogue::{
serializer, ErasedStorage, InMemStorage, InMemStorageError, Serializer, Storage, TraceStorage,
};
pub use get_chat_id::GetChatId;
use std::{marker::PhantomData, sync::Arc};
mod get_chat_id;
/// A handle for controlling dialogue state.
#[derive(Debug)]
pub struct Dialogue<D, S>
where
S: ?Sized,
{
storage: Arc<S>,
chat_id: i64,
_phantom: PhantomData<D>,
}
// `#[derive]` requires generics to implement `Clone`, but `S` is wrapped around
// `Arc`, and `D` is wrapped around PhantomData.
impl<D, S> Clone for Dialogue<D, S>
where
S: ?Sized,
{
fn clone(&self) -> Self {
Dialogue { storage: self.storage.clone(), chat_id: self.chat_id, _phantom: PhantomData }
}
}
impl<D, S> Dialogue<D, S>
where
D: Send + 'static,
S: Storage<D> + ?Sized,
{
/// Constructs a new dialogue with `storage` (where dialogues are stored)
/// and `chat_id` of a current dialogue.
pub fn new(storage: Arc<S>, chat_id: i64) -> Self {
Self { storage, chat_id, _phantom: PhantomData }
}
/// Retrieves the current state of the dialogue or `None` if there is no
/// dialogue.
pub async fn get(&self) -> Result<Option<D>, S::Error> {
self.storage.clone().get_dialogue(self.chat_id).await
}
/// Like [`Dialogue::get`] but returns a default value if there is no
/// dialogue.
pub async fn get_or_default(&self) -> Result<D, S::Error>
where
D: Default,
{
match self.get().await? {
Some(d) => Ok(d),
None => {
self.storage.clone().update_dialogue(self.chat_id, D::default()).await?;
Ok(D::default())
}
}
}
/// Updates the dialogue state.
///
/// The dialogue type `D` must implement `From<State>` to allow implicit
/// conversion from `State` to `D`.
pub async fn update<State>(&self, state: State) -> Result<(), S::Error>
where
D: From<State>,
{
let new_dialogue = state.into();
self.storage.clone().update_dialogue(self.chat_id, new_dialogue).await?;
Ok(())
}
/// Updates the dialogue with a default value.
pub async fn reset(&self) -> Result<(), S::Error>
where
D: Default,
{
self.update(D::default()).await
}
/// Removes the dialogue from the storage provided to [`Dialogue::new`].
pub async fn exit(&self) -> Result<(), S::Error> {
self.storage.clone().remove_dialogue(self.chat_id).await
}
}

View file

@ -1,310 +0,0 @@
use crate::{
adaptors::CacheMe,
dispatching::{
stop_token::StopToken, update_listeners, update_listeners::UpdateListener, ShutdownToken,
},
error_handlers::{ErrorHandler, LoggingErrorHandler},
requests::Requester,
types::{AllowedUpdate, Update},
utils::shutdown_token::shutdown_check_timeout_for,
};
use dptree::di::{DependencyMap, DependencySupplier};
use futures::{future::BoxFuture, StreamExt};
use std::{collections::HashSet, fmt::Debug, ops::ControlFlow, sync::Arc};
use teloxide_core::{
requests::{Request, RequesterExt},
types::UpdateKind,
};
use tokio::time::timeout;
use std::future::Future;
/// The builder for [`Dispatcher`].
pub struct DispatcherBuilder<R, Err> {
bot: R,
dependencies: DependencyMap,
handler: UpdateHandler<Err>,
default_handler: DefaultHandler,
error_handler: Arc<dyn ErrorHandler<Err> + Send + Sync>,
}
impl<R, Err> DispatcherBuilder<R, Err>
where
R: Clone + Requester + Clone + Send + Sync + 'static,
Err: Debug + Send + Sync + 'static,
{
/// Specifies a handler that will be called for an unhandled update.
///
/// By default, it is a mere [`log::warn`].
#[must_use]
pub fn default_handler<H, Fut>(self, handler: H) -> Self
where
H: Fn(Arc<Update>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let handler = Arc::new(handler);
Self {
default_handler: Box::new(move |upd| {
let handler = Arc::clone(&handler);
Box::pin(handler(upd))
}),
..self
}
}
/// Specifies a handler that will be called on a handler error.
///
/// By default, it is [`LoggingErrorHandler`].
#[must_use]
pub fn error_handler(self, handler: Arc<dyn ErrorHandler<Err> + Send + Sync>) -> Self {
Self { error_handler: handler, ..self }
}
/// Specifies dependencies that can be used inside of handlers.
///
/// By default, there is no dependencies.
#[must_use]
pub fn dependencies(self, dependencies: DependencyMap) -> Self {
Self { dependencies, ..self }
}
/// Constructs [`Dispatcher`].
#[must_use]
pub fn build(self) -> Dispatcher<R, Err> {
Dispatcher {
bot: self.bot.clone(),
cache_me_bot: self.bot.cache_me(),
dependencies: self.dependencies,
handler: self.handler,
default_handler: self.default_handler,
error_handler: self.error_handler,
allowed_updates: Default::default(),
state: ShutdownToken::new(),
}
}
}
/// The base for update dispatching.
pub struct Dispatcher<R, Err> {
bot: R,
cache_me_bot: CacheMe<R>,
dependencies: DependencyMap,
handler: UpdateHandler<Err>,
default_handler: DefaultHandler,
error_handler: Arc<dyn ErrorHandler<Err> + Send + Sync>,
// TODO: respect allowed_udpates
allowed_updates: HashSet<AllowedUpdate>,
state: ShutdownToken,
}
// TODO: it is allowed to return message as response on telegram request in
// webhooks, so we can allow this too. See more there: https://core.telegram.org/bots/api#making-requests-when-getting-updates
/// A handler that processes updates from Telegram.
pub type UpdateHandler<Err> = dptree::Handler<'static, DependencyMap, Result<(), Err>>;
type DefaultHandler = Box<dyn Fn(Arc<Update>) -> BoxFuture<'static, ()> + Send + Sync>;
impl<R, Err> Dispatcher<R, Err>
where
R: Requester + Clone + Send + Sync + 'static,
Err: Send + Sync + 'static,
{
/// Constructs a new [`DispatcherBuilder`] with `bot` and `handler`.
#[must_use]
pub fn builder(bot: R, handler: UpdateHandler<Err>) -> DispatcherBuilder<R, Err>
where
Err: Debug,
{
DispatcherBuilder {
bot,
dependencies: DependencyMap::new(),
handler,
default_handler: Box::new(|upd| {
log::warn!("Unhandled update: {:?}", upd);
Box::pin(async {})
}),
error_handler: LoggingErrorHandler::new(),
}
}
/// Starts your bot with the default parameters.
///
/// The default parameters are a long polling update listener and log all
/// errors produced by this listener.
///
/// Each time a handler is invoked, [`Dispatcher`] adds the following
/// dependencies (in addition to those passed to
/// [`DispatcherBuilder::dependencies`]):
///
/// - Your bot passed to [`Dispatcher::builder`];
/// - An update from Telegram;
/// - [`crate::types::Me`] (can be used in [`HandlerExt::filter_command`]).
///
/// [`shutdown`]: ShutdownToken::shutdown
/// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler
/// [`HandlerExt::filter_command`]: crate::dispatching2::HandlerExt::filter_command
pub async fn dispatch(&mut self)
where
R: Requester + Clone,
<R as Requester>::GetUpdates: Send,
{
let listener = update_listeners::polling_default(self.bot.clone()).await;
let error_handler =
LoggingErrorHandler::with_custom_text("An error from the update listener");
self.dispatch_with_listener(listener, error_handler).await;
}
/// Starts your bot with custom `update_listener` and
/// `update_listener_error_handler`.
///
/// This method adds the same dependencies as [`Dispatcher::dispatch`].
///
/// [`shutdown`]: ShutdownToken::shutdown
/// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler
pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>(
&'a mut self,
mut update_listener: UListener,
update_listener_error_handler: Arc<Eh>,
) where
UListener: UpdateListener<ListenerE> + 'a,
Eh: ErrorHandler<ListenerE> + 'a,
ListenerE: Debug,
{
update_listener.hint_allowed_updates(&mut self.allowed_updates.clone().into_iter());
let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener);
let mut stop_token = Some(update_listener.stop_token());
self.state.start_dispatching();
{
let stream = update_listener.as_stream();
tokio::pin!(stream);
loop {
// False positive
#[allow(clippy::collapsible_match)]
if let Ok(upd) = timeout(shutdown_check_timeout, stream.next()).await {
match upd {
None => break,
Some(upd) => self.process_update(upd, &update_listener_error_handler).await,
}
}
if self.state.is_shutting_down() {
if let Some(token) = stop_token.take() {
log::debug!("Start shutting down dispatching...");
token.stop();
break;
}
}
}
}
// TODO: wait for executing handlers?
self.state.done();
}
async fn process_update<LErr, LErrHandler>(
&self,
update: Result<Update, LErr>,
err_handler: &Arc<LErrHandler>,
) where
LErrHandler: ErrorHandler<LErr>,
{
match update {
Ok(upd) => {
if let UpdateKind::Error(err) = upd.kind {
log::error!(
"Cannot parse an update.\nError: {:?}\n\
This is a bug in teloxide-core, please open an issue here: \
https://github.com/teloxide/teloxide/issues.",
err,
);
return;
}
let mut deps = self.dependencies.clone();
deps.insert(upd);
deps.insert(self.bot.clone());
deps.insert(
self.cache_me_bot.get_me().send().await.expect("Failed to retrieve 'me'"),
);
match self.handler.dispatch(deps).await {
ControlFlow::Break(Ok(())) => {}
ControlFlow::Break(Err(err)) => {
self.error_handler.clone().handle_error(err).await
}
ControlFlow::Continue(deps) => {
let upd = deps.get();
(self.default_handler)(upd).await;
}
}
}
Err(err) => err_handler.clone().handle_error(err).await,
}
}
/// Setups the `^C` handler that [`shutdown`]s dispatching.
///
/// [`shutdown`]: ShutdownToken::shutdown
#[cfg(feature = "ctrlc_handler")]
pub fn setup_ctrlc_handler(&mut self) -> &mut Self {
let token = self.state.clone();
tokio::spawn(async move {
loop {
tokio::signal::ctrl_c().await.expect("Failed to listen for ^C");
match token.shutdown() {
Ok(f) => {
log::info!("^C received, trying to shutdown the dispatcher...");
f.await;
log::info!("dispatcher is shutdown...");
}
Err(_) => {
log::info!("^C received, the dispatcher isn't running, ignoring the signal")
}
}
}
});
self
}
/// Returns a shutdown token, which can later be used to shutdown
/// dispatching.
pub fn shutdown_token(&self) -> ShutdownToken {
self.state.clone()
}
}
#[cfg(test)]
mod tests {
use std::convert::Infallible;
use teloxide_core::Bot;
use super::*;
#[tokio::test]
async fn test_tokio_spawn() {
tokio::spawn(async {
// Just check that this code compiles.
if false {
Dispatcher::<_, Infallible>::builder(Bot::new(""), dptree::entry())
.build()
.dispatch()
.await;
}
})
.await
.unwrap();
}
}

View file

@ -1,110 +0,0 @@
//! A new dispatching model based on [`dptree`].
//!
//! In teloxide, updates are dispatched by a pipleine. The central type is
//! [`dptree::Handler`] -- it represents a handler of an update; since the API
//! is highly declarative, you can combine handlers with each other via such
//! methods as [`dptree::Handler::chain`] and [`dptree::Handler::branch`]. The
//! former method pipes one handler to another one, whilst the latter creates a
//! new node, as communicated by the name. For more information, please refer to
//! the documentation of [`dptree`].
//!
//! The pattern itself is called [chain of responsibility], a well-known design
//! technique across OOP developers. But unlike typical object-oriented design,
//! we employ declarative FP-style functions like [`dptree::filter`],
//! [`dptree::filter_map`], and [`dptree::endpoint`]; these functions create
//! special forms of [`dptree::Handler`]; for more information, please refer to
//! their respective documentation. Each of these higher-order functions accept
//! a closure that is made into a handler -- this closure can take any
//! additional parameters, which must be supplied while creating [`Dispatcher`]
//! (see [`DispatcherBuilder::dependencies`]).
//!
//! The [`Dispatcher`] type puts all these things together: it only provides
//! [`Dispatcher::dispatch`] and a handful of other methods. Once you call
//! `.dispatch()`, it will retrieve updates from the Telegram server and pass
//! them to your handler, which is a parameter of [`Dispatcher::builder`].
//!
//! Let us look at a simple example:
//!
//!
//! ([Full](https://github.com/teloxide/teloxide/blob/master/examples/shared_state.rs))
//!
//! ```no_run
//! use std::sync::atomic::{AtomicU64, Ordering};
//!
//! use once_cell::sync::Lazy;
//! use teloxide::prelude2::*;
//!
//! static MESSAGES_TOTAL: Lazy<AtomicU64> = Lazy::new(AtomicU64::default);
//!
//! # #[tokio::main]
//! # async fn main() {
//! pretty_env_logger::init();
//! log::info!("Starting shared_state_bot...");
//!
//! let bot = Bot::from_env().auto_send();
//!
//! let handler = Update::filter_message().branch(dptree::endpoint(
//! |msg: Message, bot: AutoSend<Bot>| async move {
//! let previous = MESSAGES_TOTAL.fetch_add(1, Ordering::Relaxed);
//! bot.send_message(msg.chat.id, format!("I received {} messages in total.", previous))
//! .await?;
//! respond(())
//! },
//! ));
//!
//! Dispatcher::builder(bot, handler).build().setup_ctrlc_handler().dispatch().await;
//! # }
//! ```
//!
//! 1. First, we create the bot: `let bot = Bot::from_env().auto_send()`.
//! 2. Then we construct an update handler. While it is possible to handle all
//! kinds of [`crate::types::Update`], here we are only interested in
//! [`crate::types::Message`]: [`UpdateFilterExt::filter_message`] create a
//! handler object which filters all messages out of a generic update.
//! 3. By doing `.branch(dptree::endpoint(...))`, we set up a custom handling
//! closure that receives `msg: Message` and `bot: AutoSend<Bot>`. There are
//! called dependencies: `msg` is supplied by
//! [`UpdateFilterExt::filter_message`], while `bot` is supplied by
//! [`Dispatcher`].
//!
//! That being said, if we receive a message, the dispatcher will call our
//! handler, but if we receive something other than a message (e.g., a channel
//! post), you will see an unhandled update notice in your terminal.
//!
//! This is a very limited example of update pipelining facilities. In more
//! involved scenarios, there are multiple branches and chains; if one element
//! of a chain fails to handle an update, the update will be passed forwards; if
//! no handler succeeds at handling the update, [`Dispatcher`] will invoke a
//! default handler set up via [`DispatcherBuilder::default_handler`].
//!
//! Update pipelining provides several advantages over the typical `match
//! (update.kind) { ... }` approach:
//!
//! 1. It supports _extension_: e.g., you
//! can define extension filters or some other handlers and then combine them in
//! a single place, thus facilitating loose coupling.
//! 2. Pipelining exhibits a natural syntax for expressing message processing.
//! 3. Lastly, it provides a primitive form of [dependency injection (DI)],
//! which allows you to deal with such objects as a bot and various update types
//! easily.
//!
//! For a more involved example, see [`examples/dispatching2_features.rs`](https://github.com/teloxide/teloxide/blob/master/examples/dispatching2_features.rs).
//!
//! TODO: explain a more involved example with multiple branches.
//!
//! [chain of responsibility]: https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern
//! [dependency injection (DI)]: https://en.wikipedia.org/wiki/Dependency_injection
#[cfg(all(feature = "dispatching2", feature = "ctrlc_handler"))]
pub mod repls;
pub mod dialogue;
mod dispatcher;
mod filter_ext;
mod handler_ext;
mod handler_factory;
pub use dispatcher::{Dispatcher, DispatcherBuilder, UpdateHandler};
pub use filter_ext::{MessageFilterExt, UpdateFilterExt};
pub use handler_ext::HandlerExt;
pub use handler_factory::HandlerFactory;

View file

@ -1,102 +0,0 @@
use crate::{
dispatching::{update_listeners, update_listeners::UpdateListener},
dispatching2::{HandlerExt, UpdateFilterExt},
error_handlers::LoggingErrorHandler,
types::Update,
utils::command::BotCommand,
};
use dptree::di::{DependencyMap, Injectable};
use std::{fmt::Debug, marker::PhantomData};
use teloxide_core::requests::Requester;
/// A [REPL] for commands.
///
/// All errors from an update listener and handler will be logged.
///
/// ## Caution
/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs,
/// because Telegram disallow multiple requests at the same time from the same
/// bot.
///
/// ## Dependency requirements
///
/// - Those of [`HandlerExt::filter_command`].
///
/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop
/// [`Dispatcher`]: crate::dispatching::Dispatcher
#[cfg(feature = "ctrlc_handler")]
pub async fn commands_repl<'a, R, Cmd, H, E, Args>(bot: R, handler: H, cmd: PhantomData<Cmd>)
where
Cmd: BotCommand + Send + Sync + 'static,
H: Injectable<DependencyMap, Result<(), E>, Args> + Send + Sync + 'static,
R: Requester + Clone + Send + Sync + 'static,
<R as Requester>::GetUpdates: Send,
E: Debug + Send + Sync + 'static,
{
let cloned_bot = bot.clone();
commands_repl_with_listener(
bot,
handler,
update_listeners::polling_default(cloned_bot).await,
cmd,
)
.await;
}
/// Like [`commands_repl`], but with a custom [`UpdateListener`].
///
/// All errors from an update listener and handler will be logged.
///
/// ## Caution
/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs,
/// because Telegram disallow multiple requests at the same time from the same
/// bot.
///
/// ## Dependency requirements
///
/// - Those of [`HandlerExt::filter_command`].
///
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`commands_repl`]: crate::dispatching::repls::commands_repl()
/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener
#[cfg(feature = "ctrlc_handler")]
pub async fn commands_repl_with_listener<'a, R, Cmd, H, L, ListenerE, E, Args>(
bot: R,
handler: H,
listener: L,
_cmd: PhantomData<Cmd>,
) where
Cmd: BotCommand + Send + Sync + 'static,
H: Injectable<DependencyMap, Result<(), E>, Args> + Send + Sync + 'static,
L: UpdateListener<ListenerE> + Send + 'a,
ListenerE: Debug + Send + 'a,
R: Requester + Clone + Send + Sync + 'static,
E: Debug + Send + Sync + 'static,
{
use crate::dispatching2::Dispatcher;
// Other update types are of no interest to use since this REPL is only for
// commands. See <https://github.com/teloxide/teloxide/issues/557>.
let ignore_update = |_upd| Box::pin(async {});
let mut dispatcher = Dispatcher::builder(
bot,
Update::filter_message().filter_command::<Cmd>().branch(dptree::endpoint(handler)),
)
.default_handler(ignore_update)
.build();
#[cfg(feature = "ctrlc_handler")]
dispatcher.setup_ctrlc_handler();
// To make mutable var from immutable.
let mut dispatcher = dispatcher;
dispatcher
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),
)
.await;
}

View file

@ -1,96 +0,0 @@
use crate::{
dispatching::{
dialogue::{DialogueDispatcher, DialogueStage, DialogueWithCx, InMemStorageError},
update_listeners,
update_listeners::UpdateListener,
Dispatcher, UpdateWithCx,
},
error_handlers::LoggingErrorHandler,
};
use std::{fmt::Debug, future::Future, sync::Arc};
use teloxide_core::{requests::Requester, types::Message};
/// A [REPL] for dialogues.
///
/// All errors from an update listener and handler will be logged. This function
/// uses [`InMemStorage`].
///
/// # Caution
/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs,
/// because Telegram disallow multiple requests at the same time from the same
/// bot.
///
/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage
#[cfg(feature = "ctrlc_handler")]
pub async fn dialogues_repl<'a, R, H, D, Fut>(requester: R, handler: H)
where
H: Fn(UpdateWithCx<R, Message>, D) -> Fut + Send + Sync + 'static,
D: Clone + Default + Send + 'static,
Fut: Future<Output = DialogueStage<D>> + Send + 'static,
R: Requester + Send + Clone + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
let cloned_requester = requester.clone();
dialogues_repl_with_listener(
requester,
handler,
update_listeners::polling_default(cloned_requester).await,
)
.await;
}
/// Like [`dialogues_repl`], but with a custom [`UpdateListener`].
///
/// All errors from an update listener and handler will be logged. This function
/// uses [`InMemStorage`].
///
/// # Caution
/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs,
/// because Telegram disallow multiple requests at the same time from the same
/// bot.
///
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`dialogues_repl`]: crate::dispatching::repls::dialogues_repl()
/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener
/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage
#[cfg(feature = "ctrlc_handler")]
pub async fn dialogues_repl_with_listener<'a, R, H, D, Fut, L, ListenerE>(
requester: R,
handler: H,
listener: L,
) where
H: Fn(UpdateWithCx<R, Message>, D) -> Fut + Send + Sync + 'static,
D: Clone + Default + Send + 'static,
Fut: Future<Output = DialogueStage<D>> + Send + 'static,
L: UpdateListener<ListenerE> + Send + 'a,
ListenerE: Debug + Send + 'a,
R: Requester + Send + Clone + 'static,
{
let handler = Arc::new(handler);
Dispatcher::new(requester)
.messages_handler(DialogueDispatcher::new(
move |DialogueWithCx { cx, dialogue }: DialogueWithCx<
R,
Message,
D,
InMemStorageError,
>| {
let handler = Arc::clone(&handler);
async move {
let dialogue = dialogue.expect("std::convert::Infallible");
handler(cx, dialogue).await
}
},
))
.setup_ctrlc_handler()
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),
)
.await;
}

View file

@ -1,8 +0,0 @@
//! REPLs for dispatching updates.
mod commands_repl;
mod repl;
pub use commands_repl::{commands_repl, commands_repl_with_listener};
//pub use dialogues_repl::{dialogues_repl, dialogues_repl_with_listener};
pub use repl::{repl, repl_with_listener};

View file

@ -1,78 +0,0 @@
use crate::{
dispatching::{update_listeners, update_listeners::UpdateListener},
dispatching2::UpdateFilterExt,
error_handlers::{LoggingErrorHandler, OnError},
types::Update,
};
use dptree::di::{DependencyMap, Injectable};
use std::fmt::Debug;
use teloxide_core::requests::Requester;
/// A [REPL] for messages.
///
/// All errors from an update listener and a handler will be logged.
///
/// # Caution
/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs,
/// because Telegram disallow multiple requests at the same time from the same
/// bot.
///
/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop
/// [`Dispatcher`]: crate::dispatching::Dispatcher
#[cfg(feature = "ctrlc_handler")]
pub async fn repl<R, H, E, Args>(bot: R, handler: H)
where
H: Injectable<DependencyMap, Result<(), E>, Args> + Send + Sync + 'static,
Result<(), E>: OnError<E>,
E: Debug + Send + Sync + 'static,
R: Requester + Send + Sync + Clone + 'static,
<R as Requester>::GetUpdates: Send,
{
let cloned_bot = bot.clone();
repl_with_listener(bot, handler, update_listeners::polling_default(cloned_bot).await).await;
}
/// Like [`repl`], but with a custom [`UpdateListener`].
///
/// All errors from an update listener and handler will be logged.
///
/// # Caution
/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs,
/// because Telegram disallow multiple requests at the same time from the same
/// bot.
///
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`repl`]: crate::dispatching::repls::repl()
/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener
#[cfg(feature = "ctrlc_handler")]
pub async fn repl_with_listener<'a, R, H, E, L, ListenerE, Args>(bot: R, handler: H, listener: L)
where
H: Injectable<DependencyMap, Result<(), E>, Args> + Send + Sync + 'static,
L: UpdateListener<ListenerE> + Send + 'a,
ListenerE: Debug,
Result<(), E>: OnError<E>,
E: Debug + Send + Sync + 'static,
R: Requester + Clone + Send + Sync + 'static,
{
use crate::dispatching2::Dispatcher;
// Other update types are of no interest to use since this REPL is only for
// messages. See <https://github.com/teloxide/teloxide/issues/557>.
let ignore_update = |_upd| Box::pin(async {});
#[allow(unused_mut)]
let mut dispatcher =
Dispatcher::builder(bot, Update::filter_message().branch(dptree::endpoint(handler)))
.default_handler(ignore_update)
.build();
#[cfg(feature = "ctrlc_handler")]
dispatcher.setup_ctrlc_handler();
dispatcher
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),
)
.await;
}

View file

@ -6,7 +6,7 @@
//! //!
//! ([Full](https://github.com/teloxide/teloxide/blob/master/examples/dices.rs)) //! ([Full](https://github.com/teloxide/teloxide/blob/master/examples/dices.rs))
//! ```no_run //! ```no_run
//! use teloxide::prelude2::*; //! use teloxide::prelude::*;
//! //!
//! # #[tokio::main] //! # #[tokio::main]
//! # async fn main() { //! # async fn main() {
@ -15,7 +15,7 @@
//! //!
//! let bot = Bot::from_env().auto_send(); //! let bot = Bot::from_env().auto_send();
//! //!
//! teloxide::repls2::repl(bot, |message: Message, bot: AutoSend<Bot>| async move { //! teloxide::repl(bot, |message: Message, bot: AutoSend<Bot>| async move {
//! bot.send_dice(message.chat.id).await?; //! bot.send_dice(message.chat.id).await?;
//! respond(()) //! respond(())
//! }) //! })
@ -61,25 +61,16 @@
// https://github.com/rust-lang/rust-clippy/issues/7422 // https://github.com/rust-lang/rust-clippy/issues/7422
#![allow(clippy::nonstandard_macro_braces)] #![allow(clippy::nonstandard_macro_braces)]
#[cfg(feature = "ctrlc_handler")] #[cfg(all(feature = "ctrlc_handler"))]
pub use dispatching::repls::{ pub use dispatching::repls::{
commands_repl, commands_repl_with_listener, dialogues_repl, dialogues_repl_with_listener, repl, commands_repl, commands_repl_with_listener, repl, repl_with_listener,
repl_with_listener,
}; };
#[cfg(all(feature = "dispatching2", feature = "ctrlc_handler"))]
pub use dispatching2::repls as repls2;
mod logging; mod logging;
// Things from this module is also used for the dispatching2 module.
pub mod dispatching; pub mod dispatching;
#[cfg(feature = "dispatching2")]
pub mod dispatching2;
pub mod error_handlers; pub mod error_handlers;
pub mod prelude; pub mod prelude;
#[cfg(feature = "dispatching2")]
pub mod prelude2;
pub mod utils; pub mod utils;
#[doc(inline)] #[doc(inline)]
@ -88,10 +79,7 @@ pub use teloxide_core::*;
#[cfg(feature = "macros")] #[cfg(feature = "macros")]
pub use teloxide_macros as macros; pub use teloxide_macros as macros;
#[cfg(feature = "dispatching2")]
pub use dptree; pub use dptree;
#[cfg(feature = "macros")]
pub use teloxide_macros::teloxide;
#[cfg(all(feature = "nightly", doctest))] #[cfg(all(feature = "nightly", doctest))]
#[cfg_attr(feature = "nightly", cfg_attr(feature = "nightly", doc = include_str!("../README.md")))] #[cfg_attr(feature = "nightly", cfg_attr(feature = "nightly", doc = include_str!("../README.md")))]

View file

@ -1,27 +1,17 @@
//! Commonly used items. //! Commonly used items.
#![deprecated(note = "Use dispatching2 instead")]
#![allow(deprecated)]
pub use crate::{ pub use crate::{
error_handlers::{LoggingErrorHandler, OnError}, error_handlers::{LoggingErrorHandler, OnError},
respond, respond,
}; };
pub use crate::dispatching::{ pub use crate::dispatching::{
dialogue::{ dialogue::Dialogue, Dispatcher, HandlerExt as _, MessageFilterExt as _, UpdateFilterExt as _,
exit, next, DialogueDispatcher, DialogueStage, DialogueWithCx, GetChatId, Transition,
TransitionIn, TransitionOut,
},
Dispatcher, DispatcherHandlerRx, DispatcherHandlerRxExt, UpdateWithCx,
}; };
#[cfg(feature = "macros")]
pub use crate::teloxide;
pub use teloxide_core::types::{ pub use teloxide_core::types::{
CallbackQuery, ChatMemberUpdated, ChosenInlineResult, InlineQuery, Message, Poll, PollAnswer, CallbackQuery, ChatMemberUpdated, ChosenInlineResult, InlineQuery, Message, Poll, PollAnswer,
PreCheckoutQuery, ShippingQuery, PreCheckoutQuery, ShippingQuery, Update,
}; };
#[cfg(feature = "auto-send")] #[cfg(feature = "auto-send")]
@ -30,9 +20,4 @@ pub use crate::adaptors::AutoSend;
#[doc(no_inline)] #[doc(no_inline)]
pub use teloxide_core::prelude::*; pub use teloxide_core::prelude::*;
#[cfg(feature = "frunk")] pub use dptree::{self, prelude::*};
pub use crate::utils::UpState;
pub use tokio::sync::mpsc::UnboundedReceiver;
pub use futures::StreamExt;

View file

@ -1,26 +0,0 @@
//! Commonly used items (`dispatching2`).
pub use crate::{
error_handlers::{LoggingErrorHandler, OnError},
respond,
};
pub use crate::dispatching2::{
dialogue::Dialogue, Dispatcher, HandlerExt as _, MessageFilterExt as _, UpdateFilterExt as _,
};
#[cfg(feature = "macros")]
pub use crate::teloxide;
pub use teloxide_core::types::{
CallbackQuery, ChatMemberUpdated, ChosenInlineResult, InlineQuery, Message, Poll, PollAnswer,
PreCheckoutQuery, ShippingQuery, Update,
};
#[cfg(feature = "auto-send")]
pub use crate::adaptors::AutoSend;
#[doc(no_inline)]
pub use teloxide_core::prelude::*;
pub use dptree::{self, prelude::*};

View file

@ -4,7 +4,6 @@ pub mod command;
pub mod html; pub mod html;
pub mod markdown; pub mod markdown;
pub(crate) mod shutdown_token; pub(crate) mod shutdown_token;
mod up_state;
pub use teloxide_core::net::client_from_env; pub use teloxide_core::net::client_from_env;

View file

@ -12,7 +12,7 @@ use tokio::sync::Notify;
use crate::dispatching::update_listeners::UpdateListener; use crate::dispatching::update_listeners::UpdateListener;
/// A token which used to shutdown [`Dispatcher`]. /// A token which used to shutdown [`crate::dispatching::Dispatcher`].
#[derive(Clone)] #[derive(Clone)]
pub struct ShutdownToken { pub struct ShutdownToken {
dispatcher_state: Arc<DispatcherState>, dispatcher_state: Arc<DispatcherState>,
@ -20,7 +20,7 @@ pub struct ShutdownToken {
} }
/// This error is returned from [`ShutdownToken::shutdown`] when trying to /// This error is returned from [`ShutdownToken::shutdown`] when trying to
/// shutdown an idle [`Dispatcher`]. /// shutdown an idle [`crate::dispatching::Dispatcher`].
#[derive(Debug)] #[derive(Debug)]
pub struct IdleShutdownError; pub struct IdleShutdownError;

View file

@ -1,25 +0,0 @@
#![cfg(feature = "frunk")]
use frunk::{from_generic, generic::Generic, hlist::h_cons, into_generic, HCons, HNil};
use std::ops::Add;
/// Constructs a structure from another structure and a field.
///
/// Let `X` be a structure of `field1, ..., fieldN`, `Y` be `field1, ...,
/// fieldN, fieldN+1`. Both `X` and `Y` implement [`Generic`]. Then `Y::up(x,
/// fieldN+1)` constructs `Y` from all the fields of `x: X` plus `Y`'s
/// `fieldN+1`.
///
/// [`Generic`]: https://docs.rs/frunk/latest/frunk/generic/trait.Generic.html
pub trait UpState: Sized {
fn up<Src, F>(src: Src, field: F) -> Self
where
Src: Generic,
Self: Generic<Repr = <<Src as Generic>::Repr as Add<HCons<F, HNil>>>::Output>,
<Src as Generic>::Repr: Add<HCons<F, HNil>>,
{
from_generic(into_generic(src) + h_cons(field, HNil))
}
}
impl<Dst> UpState for Dst {}