added handler_ext.rs, updated dptree version

This commit is contained in:
p0lunin 2021-12-26 16:55:24 +02:00
parent 6959d1c928
commit a644d0b6d2
18 changed files with 151 additions and 81 deletions

View file

@ -25,9 +25,10 @@ authors = [
maintenance = { status = "actively-developed" }
[features]
default = ["native-tls", "ctrlc_handler", "teloxide-core/default"]
default = ["native-tls", "ctrlc_handler", "teloxide-core/default", "new-dispatching"]
old_dispatching = []
old-dispatching = []
new-dispatching = ["dptree"]
sqlite-storage = ["sqlx"]
redis-storage = ["redis"]
@ -74,12 +75,10 @@ teloxide-core = { version = "0.3.3", default-features = false }
#teloxide-core = { git = "https://github.com/teloxide/teloxide-core.git", rev = "...", default-features = false }
teloxide-macros = { version = "0.4", optional = true }
dptree = { path = "../chakka" }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
# dptree = { git = "https://github.com/p0lunin/dptree", optional = true }
dptree = { git = "https://github.com/p0lunin/dptree", optional = true }
tokio = { version = "1.8", features = ["fs"] }
tokio-util = "0.6"

View file

@ -1,7 +1,11 @@
use std::{error::Error, str::FromStr};
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use teloxide::{prelude::*, types::{ChatPermissions, Me}, utils::command::BotCommand};
use teloxide::{
prelude::*,
types::{ChatPermissions, Me},
utils::command::BotCommand,
};
// Derive BotCommand to parse text with a command into this enumeration.
//

View file

@ -1,7 +1,14 @@
// The version of Heroku ping-pong-bot, which uses a webhook to receive updates
// from Telegram, instead of long polling.
use teloxide::{dispatching::{update_listeners::{self, StatefulListener}, stop_token::AsyncStopToken}, prelude::*, types::Update};
use teloxide::{
dispatching::{
stop_token::AsyncStopToken,
update_listeners::{self, StatefulListener},
},
prelude::*,
types::Update,
};
use std::{convert::Infallible, env, net::SocketAddr};
use tokio::sync::mpsc;
@ -60,9 +67,13 @@ pub async fn webhook(bot: AutoSend<Bot>) -> impl update_listeners::UpdateListene
tokio::spawn(fut);
let stream = UnboundedReceiverStream::new(rx);
fn streamf<S, T>(state: &mut (S, T)) -> &mut S { &mut state.0 }
StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| state.1.clone())
fn streamf<S, T>(state: &mut (S, T)) -> &mut S {
&mut state.0
}
StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| {
state.1.clone()
})
}
async fn run() {

View file

@ -20,8 +20,9 @@ async fn run() {
UnboundedReceiverStream::new(rx).for_each_concurrent(None, |query| async move {
// First, create your actual response
let google_search = InlineQueryResultArticle::new(
// Each item needs a unique ID, as well as the response container for the items.
// These can be whatever, as long as they don't conflict.
// Each item needs a unique ID, as well as the response container for the
// items. These can be whatever, as long as they don't
// conflict.
"01".to_string(),
// What the user will actually see
"Google Search",
@ -31,10 +32,10 @@ async fn run() {
query.update.query,
))),
);
// While constructing them from the struct itself is possible, it is preferred to use
// the builder pattern if you wish to add more information to your result.
// Please refer to the documentation for more detailed information about each field.
// https://docs.rs/teloxide/0.5.1/teloxide/types/struct.InlineQueryResultArticle.html
// While constructing them from the struct itself is possible, it is preferred
// to use the builder pattern if you wish to add more
// information to your result. Please refer to the documentation
// for more detailed information about each field. https://docs.rs/teloxide/0.5.1/teloxide/types/struct.InlineQueryResultArticle.html
let ddg_search = InlineQueryResultArticle::new(
"02".to_string(),
"DuckDuckGo Search".to_string(),
@ -52,7 +53,8 @@ async fn run() {
InlineQueryResult::Article(ddg_search),
];
// Send it off! One thing to note -- the ID we use here must be of the query we're responding to.
// Send it off! One thing to note -- the ID we use here must be of the query
// we're responding to.
let response =
query.requester.answer_inline_query(&query.update.id, results).send().await;
if let Err(err) = response {

View file

@ -1,7 +1,14 @@
// The version of ngrok ping-pong-bot, which uses a webhook to receive updates
// from Telegram, instead of long polling.
use teloxide::{dispatching::{update_listeners::{self, StatefulListener}, stop_token::AsyncStopToken}, prelude::*, types::Update};
use teloxide::{
dispatching::{
stop_token::AsyncStopToken,
update_listeners::{self, StatefulListener},
},
prelude::*,
types::Update,
};
use std::{convert::Infallible, net::SocketAddr};
use tokio::sync::mpsc;
@ -25,9 +32,7 @@ pub async fn webhook(bot: AutoSend<Bot>) -> impl update_listeners::UpdateListene
// You might want to specify a self-signed certificate via .certificate
// method on SetWebhook.
bot.set_webhook(url)
.await
.expect("Cannot setup a webhook");
bot.set_webhook(url).await.expect("Cannot setup a webhook");
let (tx, rx) = mpsc::unbounded_channel();
@ -54,9 +59,13 @@ pub async fn webhook(bot: AutoSend<Bot>) -> impl update_listeners::UpdateListene
tokio::spawn(fut);
let stream = UnboundedReceiverStream::new(rx);
fn streamf<S, T>(state: &mut (S, T)) -> &mut S { &mut state.0 }
StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| state.1.clone())
fn streamf<S, T>(state: &mut (S, T)) -> &mut S {
&mut state.0
}
StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| {
state.1.clone()
})
}
async fn run() {

View file

@ -1,7 +1,6 @@
use teloxide::{prelude::*, utils::command::BotCommand};
use std::error::Error;
use std::sync::Arc;
use std::{error::Error, sync::Arc};
#[derive(BotCommand)]
#[command(rename = "lowercase", description = "These commands are supported:")]
@ -20,14 +19,16 @@ async fn answer(
command: Arc<Command>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
match command.as_ref() {
Command::Help => {
bot.send_message(message.chat.id, Command::descriptions()).await?
}
Command::Help => bot.send_message(message.chat.id, Command::descriptions()).await?,
Command::Username(username) => {
bot.send_message(message.chat.id, format!("Your username is @{}.", username)).await?
}
Command::UsernameAndAge { username, age } => {
bot.send_message(message.chat.id, format!("Your username is @{} and age is {}.", username, age)).await?
bot.send_message(
message.chat.id,
format!("Your username is @{} and age is {}.", username, age),
)
.await?
}
};
@ -45,6 +46,6 @@ async fn run() {
let bot = Bot::from_env().auto_send();
let bot_name: String = "".into();// panic!("Your bot's name here");
let bot_name: String = "".into(); // panic!("Your bot's name here");
teloxide::commands_repl(bot, bot_name, answer, Command::ty()).await;
}

View file

@ -7,7 +7,6 @@ edition = "2018"
[dependencies]
# You can also choose "cbor-serializer" or built-in JSON serializer
teloxide = { path = "../../", features = ["sqlite-storage", "bincode-serializer", "redis-storage", "macros", "auto-send"] }
dptree = { path = "../../../chakka" }
log = "0.4.8"
pretty_env_logger = "0.4.0"

View file

@ -1,10 +1,10 @@
use std::sync::Arc;
use teloxide::{
dispatching2::dialogue::{serializer::Json, SqliteStorage, Storage},
prelude::*,
RequestError,
};
use thiserror::Error;
use std::sync::Arc;
type Store = SqliteStorage<Json>;
// FIXME: naming
@ -32,9 +32,9 @@ impl Default for BotDialogue {
}
async fn handle_message(
bot: Arc<AutoSend<Bot>>,
mes: Arc<Message>,
dialogue: Arc<MyDialogue>,
bot: AutoSend<Bot>,
mes: Message,
dialogue: MyDialogue,
) -> Result<(), Error> {
match mes.text() {
None => {
@ -46,14 +46,19 @@ async fn handle_message(
BotDialogue::Start => {
if let Ok(number) = ans.parse() {
dialogue.next(BotDialogue::HaveNumber(number)).await?;
bot.send_message(mes.chat.id, format!("Remembered number {}. Now use /get or /reset", number)).await?;
bot.send_message(
mes.chat.id,
format!("Remembered number {}. Now use /get or /reset", number),
)
.await?;
} else {
bot.send_message(mes.chat.id, "Please, send me a number").await?;
}
}
BotDialogue::HaveNumber(num) => {
if ans.starts_with("/get") {
bot.send_message(mes.chat.id, format!("Here is your number: {}", num)).await?;
bot.send_message(mes.chat.id, format!("Here is your number: {}", num))
.await?;
} else if ans.starts_with("/reset") {
dialogue.reset().await?;
bot.send_message(mes.chat.id, "Resetted number").await?;
@ -62,25 +67,20 @@ async fn handle_message(
}
}
}
},
}
}
Ok(())
}
#[tokio::main]
async fn main() {
let bot = Arc::new(Bot::from_env().auto_send());
let bot = Bot::from_env().auto_send();
let storage = SqliteStorage::open("db.sqlite", Json).await.unwrap();
Dispatcher::new(bot)
.dependencies({
let mut map = dptree::di::DependencyMap::new();
map.insert_arc(storage);
map
})
.dependencies(dptree::deps![storage])
.messages_handler(|h| {
h.add_dialogue::<Message, Store, BotDialogue>()
.branch(dptree::endpoint(handle_message))
h.add_dialogue::<Message, Store, BotDialogue>().branch(dptree::endpoint(handle_message))
})
.dispatch()
.await;

View file

@ -49,7 +49,7 @@ pub mod dialogue;
pub mod stop_token;
pub mod update_listeners;
#[cfg(feature = "old_dispatching")]
#[cfg(feature = "old-dispatching")]
pub(crate) mod repls;
mod dispatcher;
@ -63,7 +63,7 @@ pub use dispatcher_handler_rx_ext::DispatcherHandlerRxExt;
use tokio::sync::mpsc::UnboundedReceiver;
pub use update_with_cx::{UpdateWithCx, UpdateWithCxRequesterType};
#[cfg(not(feature = "old_dispatching"))]
#[cfg(feature = "new-dispatching")]
pub(crate) use dispatcher::{
shutdown_check_timeout_for, shutdown_inner, DispatcherState, ShutdownState,
};

View file

@ -1,5 +1,5 @@
use crate::dispatching2::dialogue::{get_chat_id::GetChatId, Dialogue, Storage};
use dptree::{di::DependencyMap, Handler, Insert};
use dptree::{di::DependencyMap, Handler};
use std::sync::Arc;
pub trait DialogueHandlerExt {
@ -7,7 +7,7 @@ pub trait DialogueHandlerExt {
where
S: Storage<D> + Send + Sync + 'static,
D: Send + Sync + 'static,
Upd: GetChatId + Send + Sync + 'static;
Upd: GetChatId + Clone + Send + Sync + 'static;
}
impl<'a, Output> DialogueHandlerExt for Handler<'a, DependencyMap, Output>
@ -19,9 +19,9 @@ where
// FIXME: some of this requirements are useless.
S: Storage<D> + Send + Sync + 'static,
D: Send + Sync + 'static,
Upd: GetChatId + Send + Sync + 'static,
Upd: GetChatId + Clone + Send + Sync + 'static,
{
self.chain(dptree::map(|storage: Arc<S>, upd: Arc<Upd>| async move {
self.chain(dptree::filter_map(|storage: Arc<S>, upd: Upd| async move {
let chat_id = upd.chat_id()?;
Dialogue::new(storage, chat_id).ok()
}))

View file

@ -24,6 +24,18 @@ pub struct Dialogue<D, S> {
_phantom: PhantomData<D>,
}
// #[derive] requires generics to implement Clone,
// but `S` wrapped around Arc, and `D` wrapped around PhantomData.
impl<D, S> Clone for Dialogue<D, S> {
fn clone(&self) -> Self {
Dialogue {
storage: self.storage.clone(),
chat_id: self.chat_id.clone(),
_phantom: PhantomData,
}
}
}
impl<D, S> Dialogue<D, S>
where
D: Send + 'static,

View file

@ -13,7 +13,7 @@ use std::{collections::HashSet, convert::Infallible, fmt::Debug, ops::ControlFlo
use tokio::{sync::Notify, time::timeout};
pub struct Dispatcher<R, Err> {
requester: Arc<R>,
requester: R,
dependencies: DependencyMap,
handler: UpdateHandler<Err>,
@ -32,9 +32,9 @@ pub type DefaultHandler = dptree::Handler<'static, DependencyMap, (), Infallible
macro_rules! make_parser {
($kind:ident) => {
dptree::map(|upd: Arc<Update>| async move {
match &upd.kind {
UpdateKind::$kind(u) => Some(u.clone()),
dptree::filter_map(|upd: Update| async move {
match upd.kind {
UpdateKind::$kind(u) => Some(u),
_ => None,
}
})
@ -43,16 +43,16 @@ macro_rules! make_parser {
impl<R, Err> Dispatcher<R, Err>
where
R: Send + Sync + 'static,
R: Clone + Send + Sync + 'static,
Err: Send + Sync + 'static,
{
pub fn new(requester: Arc<R>) -> Self {
pub fn new(requester: R) -> Self {
Dispatcher {
requester,
dependencies: DependencyMap::new(),
handler: dptree::entry(),
default_handler: dptree::endpoint(|update: Arc<Update>| async move {
log::warn!("Unhandled update: {:?}", update.as_ref())
default_handler: dptree::endpoint(|update: Update| async move {
log::warn!("Unhandled update: {:?}", update)
}),
allowed_updates: Default::default(),
state: Arc::new(Default::default()),
@ -203,7 +203,7 @@ where
Ok(upd) => {
let mut deps = self.dependencies.clone();
deps.insert(upd);
deps.insert_arc(self.requester.clone());
deps.insert(self.requester.clone());
match self.handler.dispatch(deps).await {
ControlFlow::Break(Ok(())) => {}
ControlFlow::Break(Err(_err)) => todo!("error handler"),

View file

@ -0,0 +1,31 @@
use crate::{types::Message, utils::command::BotCommand};
use dptree::{
di::{DependencySupplier, Insert},
Handler,
};
use std::sync::Arc;
pub trait HandlerExt<IR> {
fn add_command<C>(self, bot_name: String) -> Self
where
C: BotCommand + Send,
IR: Insert<C>;
}
impl<Input, Output, IR> HandlerExt<IR> for Handler<'_, Input, Output, IR>
where
Input: Send + Sync + 'static,
Output: Send + Sync + 'static,
IR: Send + Sync + 'static + Clone + DependencySupplier<Message>,
{
fn add_command<C>(self, bot_name: String) -> Self
where
C: BotCommand + Send,
IR: Insert<C>,
{
self.chain(dptree::filter_map(move |message: Message| {
let bot_name = bot_name.clone();
async move { message.text().and_then(|text| C::parse(text, bot_name).ok()) }
}))
}
}

View file

@ -2,5 +2,6 @@ pub(crate) mod repls;
pub mod dialogue;
mod dispatcher;
mod handler_ext;
pub use dispatcher::Dispatcher;

View file

@ -1,10 +1,10 @@
use crate::{
dispatching::{update_listeners, update_listeners::UpdateListener},
dispatching2::Dispatcher,
dispatching2::{handler_ext::HandlerExt, Dispatcher},
error_handlers::{LoggingErrorHandler, OnError},
utils::command::BotCommand,
};
use dptree::di::{DependencyMap, Injector};
use dptree::di::{DependencyMap, Injectable};
use futures::StreamExt;
use std::{fmt::Debug, future::Future, marker::PhantomData, sync::Arc};
use teloxide_core::{requests::Requester, types::Message};
@ -29,7 +29,7 @@ pub async fn commands_repl<'a, R, Cmd, H, N, E, Args>(
cmd: PhantomData<Cmd>,
) where
Cmd: BotCommand + Send + Sync + 'static,
H: Injector<DependencyMap, Result<(), E>, Args> + Send + Sync + 'static,
H: Injectable<DependencyMap, Result<(), E>, Args> + Send + Sync + 'static,
N: Into<String> + Send + 'static,
R: Requester + Clone + Send + Sync + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
@ -68,7 +68,7 @@ pub async fn commands_repl_with_listener<'a, R, Cmd, H, L, ListenerE, N, E, Args
_cmd: PhantomData<Cmd>,
) where
Cmd: BotCommand + Send + Sync + 'static,
H: Injector<DependencyMap, Result<(), E>, Args> + Send + Sync + 'static,
H: Injectable<DependencyMap, Result<(), E>, Args> + Send + Sync + 'static,
L: UpdateListener<ListenerE> + Send + 'a,
ListenerE: Debug + Send + 'a,
N: Into<String> + Send + 'static,
@ -77,13 +77,8 @@ pub async fn commands_repl_with_listener<'a, R, Cmd, H, L, ListenerE, N, E, Args
{
let bot_name = bot_name.into();
let dispatcher = Dispatcher::new(Arc::new(requester)).messages_handler(|h| {
h.chain(dptree::map(move |message: Arc<Message>| {
let bot_name = bot_name.clone();
async move { message.text().and_then(|text| Cmd::parse(text, bot_name).ok()) }
}))
.branch(dptree::endpoint(handler))
});
let dispatcher = Dispatcher::new(Arc::new(requester))
.messages_handler(|h| h.add_command::<Cmd>(bot_name).branch(dptree::endpoint(handler)));
#[cfg(feature = "ctrlc_handler")]
let dispatcher = dispatcher.setup_ctrlc_handler();

View file

@ -3,7 +3,7 @@ use crate::{
dispatching2::Dispatcher,
error_handlers::{LoggingErrorHandler, OnError},
};
use dptree::di::{DependencyMap, Injector};
use dptree::di::{DependencyMap, Injectable};
use std::{fmt::Debug, sync::Arc};
use teloxide_core::requests::Requester;
@ -21,7 +21,7 @@ use teloxide_core::requests::Requester;
#[cfg(feature = "ctrlc_handler")]
pub async fn repl<R, H, E, Args>(requester: R, handler: H)
where
H: Injector<DependencyMap, Result<(), E>, Args> + Send + Sync + 'static,
H: Injectable<DependencyMap, Result<(), E>, Args> + Send + Sync + 'static,
Result<(), E>: OnError<E>,
E: Debug + Send + Sync + 'static,
R: Requester + Send + Sync + Clone + 'static,
@ -54,7 +54,7 @@ pub async fn repl_with_listener<'a, R, H, E, L, ListenerE, Args>(
handler: H,
listener: L,
) where
H: Injector<DependencyMap, Result<(), E>, Args> + Send + Sync + 'static,
H: Injectable<DependencyMap, Result<(), E>, Args> + Send + Sync + 'static,
L: UpdateListener<ListenerE> + Send + 'a,
ListenerE: Debug,
Result<(), E>: OnError<E>,

View file

@ -60,21 +60,25 @@
// https://github.com/rust-lang/rust-clippy/issues/7422
#![allow(clippy::nonstandard_macro_braces)]
#[cfg(feature = "old_dispatching")]
#[cfg(all(feature = "new-dispatching", feature = "old-dispatching"))]
compile_error!("You can use only one of еру dispatching systems, not both.");
#[cfg(feature = "old-dispatching")]
pub use dispatching::repls::{
commands_repl, commands_repl_with_listener, dialogues_repl, dialogues_repl_with_listener, repl,
repl_with_listener,
};
#[cfg(not(feature = "old_dispatching"))]
#[cfg(feature = "new-dispatching")]
pub use dispatching2::repls::{
commands_repl, commands_repl_with_listener, repl, repl_with_listener,
};
mod logging;
// Things from this module is also used for the dispatching2 module.
pub mod dispatching;
#[cfg(not(feature = "old_dispatching"))]
#[cfg(feature = "new-dispatching")]
pub mod dispatching2;
pub mod error_handlers;
pub mod prelude;

View file

@ -5,7 +5,7 @@ pub use crate::{
respond,
};
#[cfg(feature = "old_dispatching")]
#[cfg(feature = "old-dispatching")]
pub use crate::dispatching::{
dialogue::{
exit, next, DialogueDispatcher, DialogueStage, DialogueWithCx, GetChatId, Transition,
@ -14,7 +14,7 @@ pub use crate::dispatching::{
Dispatcher, DispatcherHandlerRx, DispatcherHandlerRxExt, UpdateWithCx,
};
#[cfg(not(feature = "old_dispatching"))]
#[cfg(feature = "new-dispatching")]
pub use crate::dispatching2::{
dialogue::{Dialogue, DialogueHandlerExt as _},
Dispatcher,
@ -42,3 +42,5 @@ pub use crate::utils::UpState;
pub use tokio::sync::mpsc::UnboundedReceiver;
pub use futures::StreamExt;
pub use dptree::{self, prelude::*};