From fdd79760e1e8b01b4488022cc0e681df375fc354 Mon Sep 17 00:00:00 2001 From: Waffle Date: Fri, 13 Sep 2019 00:37:45 +0300 Subject: [PATCH 01/15] Dispatcher POC impl --- Cargo.toml | 4 +- src/dispatcher/filter.rs | 243 +++++++++++++++++++++++++++++++++ src/dispatcher/handler.rs | 18 +++ src/dispatcher/mod.rs | 8 ++ src/dispatcher/simple/mod.rs | 254 +++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 6 files changed, 527 insertions(+), 1 deletion(-) create mode 100644 src/dispatcher/filter.rs create mode 100644 src/dispatcher/handler.rs create mode 100644 src/dispatcher/mod.rs create mode 100644 src/dispatcher/simple/mod.rs diff --git a/Cargo.toml b/Cargo.toml index d4c2b459..f3f24f6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,4 +13,6 @@ lazy_static = "1.3" apply = "0.2.2" derive_more = "0.15.0" tokio = "0.2.0-alpha.4" -bytes = "0.4.12" \ No newline at end of file +bytes = "0.4.12" +log = "0.4.8" +futures-util-preview = "0.3.0-alpha.18" diff --git a/src/dispatcher/filter.rs b/src/dispatcher/filter.rs new file mode 100644 index 00000000..57eb6c84 --- /dev/null +++ b/src/dispatcher/filter.rs @@ -0,0 +1,243 @@ +/// Filter that determines that particular event +/// is suitable for particular handler. +pub trait Filter { + /// Passes (return true) if event is suitable (otherwise return false) + fn test(&self, value: &T) -> bool; +} + +/// ``` +/// use async_telegram_bot::dispatcher::filter::Filter; +/// +/// let closure = |i: &i32| -> bool { *i >= 42 }; +/// assert!(closure.test(&42)); +/// assert!(closure.test(&100)); +/// +/// assert_eq!(closure.test(&41), false); +/// assert_eq!(closure.test(&0), false); +/// ``` +impl bool> Filter for F { + fn test(&self, value: &T) -> bool { + (self)(value) + } +} + +/// ``` +/// use async_telegram_bot::dispatcher::filter::Filter; +/// +/// assert!(true.test(&())); +/// assert_eq!(false.test(&()), false); +/// ``` +impl Filter for bool { + fn test(&self, _: &T) -> bool { *self } +} + +/// And filter. +/// +/// Passes if both underlying filters pass. +/// +/// **NOTE**: if one of filters don't pass +/// it is **not** guaranteed that other will be executed. +/// +/// ## Examples +/// ``` +/// use async_telegram_bot::dispatcher::filter::{And, Filter}; +/// +/// // Note: bool can be treated as `Filter` that always return self. +/// assert_eq!(And::new(true, false).test(&()), false); +/// assert_eq!(And::new(true, false).test(&()), false); +/// assert!(And::new(true, true).test(&())); +/// assert!(And::new(true, And::new(|_: &()| true, true)).test(&())); +/// ``` +#[derive(Debug, Clone, Copy)] +pub struct And(A, B); + +impl And { + pub fn new(a: A, b: B) -> Self { + And(a, b) + } +} + +impl Filter for And +where + A: Filter, + B: Filter, +{ + fn test(&self, value: &T) -> bool { + self.0.test(value) && self.1.test(value) + } +} + +/// Alias for [`And::new`] +/// +/// ## Examples +/// ``` +/// use async_telegram_bot::dispatcher::filter::{and, Filter}; +/// +/// assert!(and(true, true).test(&())); +/// assert_eq!(and(true, false).test(&()), false); +/// ``` +/// +/// [`And::new`]: crate::dispatcher::filter::And::new +pub fn and(a: A, b: B) -> And { + And::new(a, b) +} + + +/// Or filter. +/// +/// Passes if at least one underlying filters passes. +/// +/// **NOTE**: if one of filters passes +/// it is **not** guaranteed that other will be executed. +/// +/// ## Examples +/// ``` +/// use async_telegram_bot::dispatcher::filter::{Or, Filter}; +/// +/// // Note: bool can be treated as `Filter` that always return self. +/// assert!(Or::new(true, false).test(&())); +/// assert!(Or::new(false, true).test(&())); +/// assert!(Or::new(false, Or::new(|_: &()| true, false)).test(&())); +/// assert_eq!(Or::new(false, false).test(&()), false); +/// ``` +#[derive(Debug, Clone, Copy)] +pub struct Or(A, B); + +impl Or { + pub fn new(a: A, b: B) -> Self { + Or(a, b) + } +} + +impl Filter for Or +where + A: Filter, + B: Filter, +{ + fn test(&self, value: &T) -> bool { + self.0.test(value) || self.1.test(value) + } +} + +/// Alias for [`Or::new`] +/// +/// ## Examples +/// ``` +/// use async_telegram_bot::dispatcher::filter::{or, Filter}; +/// +/// assert!(or(true, false).test(&())); +/// assert_eq!(or(false, false).test(&()), false); +/// ``` +/// +/// [`Or::new`]: crate::dispatcher::filter::Or::new +pub fn or(a: A, b: B) -> Or { + Or::new(a, b) +} + + +/// Not filter. +/// +/// Passes if underlying filter don't pass. +/// +/// ## Examples +/// ``` +/// use async_telegram_bot::dispatcher::filter::{Not, Filter}; +/// +/// // Note: bool can be treated as `Filter` that always return self. +/// assert!(Not::new(false).test(&())); +/// assert_eq!(Not::new(true).test(&()), false); +/// ``` +#[derive(Debug, Clone, Copy)] +pub struct Not(A); + +impl Not { + pub fn new(a: A) -> Self { + Not(a) + } +} + +impl Filter for Not +where + A: Filter, +{ + fn test(&self, value: &T) -> bool { + !self.0.test(value) + } +} + +/// Alias for [`Not::new`] +/// +/// ## Examples +/// ``` +/// use async_telegram_bot::dispatcher::filter::{not, Filter}; +/// +/// assert!(not(false).test(&())); +/// assert_eq!(not(true).test(&()), false); +/// ``` +/// +/// [`Not::new`]: crate::dispatcher::filter::Not::new +pub fn not(a: A) -> Not { + Not::new(a) +} + +/// Return [filter] that passes if and only if all given filters passes. +/// +/// **NOTE**: if one of filters don't pass +/// it is **not** guaranteed that other will be executed. +/// +/// ## Examples +/// ``` +/// use async_telegram_bot::{all, dispatcher::filter::Filter}; +/// +/// assert!(all![true].test(&())); +/// assert!(all![true, true].test(&())); +/// assert!(all![true, true, true].test(&())); +/// +/// assert_eq!(all![false].test(&()), false); +/// assert_eq!(all![true, false].test(&()), false); +/// assert_eq!(all![false, true].test(&()), false); +/// assert_eq!(all![false, false].test(&()), false); +/// ``` +/// +/// [filter]: crate::dispatcher::filter::Filter +#[macro_export] +macro_rules! all { + ($one:expr) => { $one }; + ($head:expr, $($tail:tt)+) => { + $crate::dispatcher::filter::And::new( + $head, + $crate::all!($($tail)+) + ) + }; +} + +/// Return [filter] that passes if and only if any given filters passes. +/// +/// **NOTE**: if one of filters passes +/// it is **not** guaranteed that other will be executed. +/// +/// ## Examples +/// ``` +/// use async_telegram_bot::{any, dispatcher::filter::Filter}; +/// +/// assert!(any![true].test(&())); +/// assert!(any![true, true].test(&())); +/// assert!(any![false, true].test(&())); +/// assert!(any![true, false, true].test(&())); +/// +/// assert_eq!(any![false].test(&()), false); +/// assert_eq!(any![false, false].test(&()), false); +/// assert_eq!(any![false, false, false].test(&()), false); +/// ``` +/// +/// [filter]: crate::dispatcher::filter::Filter +#[macro_export] +macro_rules! any { + ($one:expr) => { $one }; + ($head:expr, $($tail:tt)+) => { + $crate::dispatcher::filter::Or::new( + $head, + $crate::all!($($tail)+) + ) + }; +} diff --git a/src/dispatcher/handler.rs b/src/dispatcher/handler.rs new file mode 100644 index 00000000..b36f3e93 --- /dev/null +++ b/src/dispatcher/handler.rs @@ -0,0 +1,18 @@ +use std::pin::Pin; +use std::future::Future; + + +/// Asynchronous handler for event `I` (like `&self, I -> Future` fn) +pub trait Handler { + fn handle(&self, value: I) -> Pin + 'static>>; +} + +impl Handler for F +where + Fut: Future + 'static, + F: Fn(T) -> Fut, +{ + fn handle(&self, value: T) -> Pin + 'static>> { + Box::pin((self)(value)) + } +} diff --git a/src/dispatcher/mod.rs b/src/dispatcher/mod.rs new file mode 100644 index 00000000..f4f068f0 --- /dev/null +++ b/src/dispatcher/mod.rs @@ -0,0 +1,8 @@ +/// POC implementation of update dispatching + +pub mod filter; +pub mod handler; +pub mod simple; + +pub use filter::Filter; +pub use handler::Handler; diff --git a/src/dispatcher/simple/mod.rs b/src/dispatcher/simple/mod.rs new file mode 100644 index 00000000..c3271a03 --- /dev/null +++ b/src/dispatcher/simple/mod.rs @@ -0,0 +1,254 @@ +use std::{future::Future, pin::Pin}; + +use crate::{ + dispatcher::{ + filter::Filter, + handler::Handler, + }, + core::types::{ + Update, + Message, + UpdateKind, + CallbackQuery, + ChosenInlineResult, + }, +}; + +use tokio::stream::Stream; + + +pub type Handlers = Vec<(Box>, Box>)>; + +pub struct Dispatcher { + message_handlers: Handlers, + edited_message_handlers: Handlers, + channel_post_handlers: Handlers, + edited_channel_post_handlers: Handlers, + inline_query_handlers: Handlers<()>, + chosen_inline_result_handlers: Handlers, + callback_query_handlers: Handlers, +} + +impl Dispatcher { + pub fn new() -> Self { + Dispatcher { + message_handlers: Vec::new(), + edited_message_handlers: Vec::new(), + channel_post_handlers: Vec::new(), + edited_channel_post_handlers: Vec::new(), + inline_query_handlers: Vec::new(), + chosen_inline_result_handlers: Vec::new(), + callback_query_handlers: Vec::new() + } + } + + pub fn message_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter + 'static, + H: Handler + 'static, + { + self.message_handlers.push((Box::new(filter), Box::new(handler))); + self + } + + pub fn edited_message_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter + 'static, + H: Handler + 'static, + { + self.edited_message_handlers.push((Box::new(filter), Box::new(handler))); + self + } + + pub fn channel_post_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter + 'static, + H: Handler + 'static, + { + self.channel_post_handlers.push((Box::new(filter), Box::new(handler))); + self + } + + pub fn edited_channel_post_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter + 'static, + H: Handler + 'static, + { + self.edited_channel_post_handlers.push((Box::new(filter), Box::new(handler))); + self + } + + pub fn inline_query_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter<()> + 'static, + H: Handler<()> + 'static, + { + self.inline_query_handlers.push((Box::new(filter), Box::new(handler))); + self + } + + pub fn chosen_inline_result_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter + 'static, + H: Handler + 'static, + { + self.chosen_inline_result_handlers.push((Box::new(filter), Box::new(handler))); + self + } + + pub fn callback_query_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter + 'static, + H: Handler + 'static, + { + self.callback_query_handlers.push((Box::new(filter), Box::new(handler))); + self + } + + // TODO: Can someone simplify this? + pub async fn dispatch(&mut self, updates: S) + where + S: Stream + { + use futures_util::stream::StreamExt; + + let dp = &*self; + + updates.for_each(|Update { id, kind }| async move { + log::debug!("Handled update#{id:?}: {kind:?}", id = id, kind = kind); + + match kind { + UpdateKind::Message(mes) => { + call_handler( + find_handler(&dp.message_handlers, &mes), + mes + ) + .await; + }, + UpdateKind::EditedMessage(mes) => { + call_handler( + find_handler(&dp.edited_message_handlers, &mes), + mes + ) + .await; + }, + UpdateKind::ChannelPost(post) => { + call_handler( + find_handler(&dp.channel_post_handlers, &post), + post + ) + .await; + }, + UpdateKind::EditedChannelPost(post) => { + call_handler( + find_handler(&dp.edited_channel_post_handlers, &post), + post + ) + .await; + }, + UpdateKind::InlineQuery(query) => { + call_handler( + find_handler(&dp.inline_query_handlers, &query), + query + ) + .await; + }, + UpdateKind::ChosenInlineResult(result) => { + call_handler( + find_handler(&dp.chosen_inline_result_handlers, &result), + result + ) + .await; + }, + UpdateKind::CallbackQuery(callback) => { + call_handler( + find_handler(&dp.callback_query_handlers, &callback), + callback + ) + .await; + }, + } + }) + .await; + } +} + +/// Helper function +fn find_handler<'a, T: std::fmt::Debug>(handlers: &'a Handlers, value: &T) -> Option<&'a Box>> { + let handler = handlers.iter().find_map(|e| { + let (filter, handler) = e; + if filter.test(value) { + Some(handler) + } else { + None + } + }); + + handler +} + +/// Helper function +async fn call_handler(handler: Option<&Box>>, value: T) { + match handler { + Some(handler) => handler.handle(value).await, + None => log::warn!("Unhandled update: {:?}", value) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test() { + use crate::{ + core::types::{ + Message, ChatKind, MessageKind, Sender, ForwardKind, MediaKind, Chat, User, Update, UpdateKind + }, + dispatcher::simple::Dispatcher, + }; + + let mes = Message { + id: 6534, + date: 1567898953, + chat: Chat { + id: 218485655, + photo: None, + kind: ChatKind::Private { + type_: (), + first_name: Some("W".to_string()), + last_name: None, + username: Some("WaffleLapkin".to_string()), + }, + }, + message_kind: MessageKind::IncomingMessage { + from: Sender::User(User { + id: 457569668, + is_bot: true, + first_name: "BT".to_string(), + last_name: None, + username: Some("BloodyTestBot".to_string()), + language_code: None, + }), + forward_kind: ForwardKind::Origin { + reply_to_message: None, + }, + edit_date: None, + media_kind: MediaKind::Text { + text: "text".to_string(), + entities: vec![], + }, + reply_markup: None, + }, + }; + + async fn handler(mes: Message) { + println!("{:#?}", mes) + } + + let mut dp = Dispatcher::new() + .message_handler(true, handler); + + dp.dispatch(tokio::stream::iter(vec![Update { id: 0, kind: UpdateKind::Message(mes) }])).await; + } +} diff --git a/src/lib.rs b/src/lib.rs index 0d2c5221..081b06a5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,3 +6,4 @@ extern crate serde; pub mod bot; pub mod core; pub mod keyboards; +pub mod dispatcher; From 4519109ac5939d487b45dc220c89ed2f3a936780 Mon Sep 17 00:00:00 2001 From: Waffle Date: Fri, 13 Sep 2019 14:57:02 +0300 Subject: [PATCH 02/15] Implement wrapper for filters that adds `|`, `&` operators --- src/dispatcher/filter.rs | 68 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/src/dispatcher/filter.rs b/src/dispatcher/filter.rs index 57eb6c84..ac3b1f67 100644 --- a/src/dispatcher/filter.rs +++ b/src/dispatcher/filter.rs @@ -241,3 +241,71 @@ macro_rules! any { ) }; } + + +/// Simple wrapper around `Filter` that adds `|` and `&` operators. +/// +/// ## Examples +/// ``` +/// use async_telegram_bot::dispatcher::filter::{Filter, f, F, And, Or}; +/// +/// let flt1 = |i: &i32| -> bool { *i > 17 }; +/// let flt2 = |i: &i32| -> bool { *i < 42 }; +/// let flt3 = |i: &i32| -> bool { *i % 2 == 0 }; +/// +/// let and = f(flt1) & flt2; +/// assert!(and.test(&19)); // both filters pass +/// +/// assert_eq!(and.test(&50), false); // `flt2` doesn't pass +/// assert_eq!(and.test(&16), false); // `flt1` doesn't pass +/// +/// +/// let or = f(flt1) | flt3; +/// assert!(or.test(&19)); // `flt1` passes +/// assert!(or.test(&16)); // `flt2` passes +/// assert!(or.test(&20)); // both pass +/// +/// assert_eq!(or.test(&17), false); // both don't pass +/// +/// +/// // Note: only first filter in chain should be wrapped in `f(...)` +/// let complicated: F, _>>= f(flt1) & flt2 | flt3; +/// assert!(complicated.test(&2)); // `flt3` passes +/// assert!(complicated.test(&21)); // `flt1` and `flt2` pass +/// +/// assert_eq!(complicated.test(&15), false); // `flt1` and `flt3` don't pass +/// assert_eq!(complicated.test(&43), false); // `flt2` and `flt3` don't pass +/// ``` +pub struct F(A); + +/// Constructor fn for [F] +/// +/// [F]: crate::dispatcher::filter::F; +pub fn f(a: A) -> F { + F(a) +} + +impl Filter for F +where + A: Filter +{ + fn test(&self, value: &T) -> bool { + self.0.test(value) + } +} + +impl std::ops::BitAnd for F { + type Output = F>; + + fn bitand(self, other: B) -> Self::Output { + f(and(self.0, other)) + } +} + +impl std::ops::BitOr for F { + type Output = F>; + + fn bitor(self, other: B) -> Self::Output { + f(or(self.0, other)) + } +} From 5d0bb05ee6e943b6dc6bc28a50f925d03382a1f9 Mon Sep 17 00:00:00 2001 From: Waffle Date: Mon, 16 Sep 2019 21:06:49 +0300 Subject: [PATCH 03/15] Implement extensions for filters --- src/dispatcher/filter.rs | 61 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/src/dispatcher/filter.rs b/src/dispatcher/filter.rs index ac3b1f67..b783d31f 100644 --- a/src/dispatcher/filter.rs +++ b/src/dispatcher/filter.rs @@ -309,3 +309,64 @@ impl std::ops::BitOr for F { f(or(self.0, other)) } } + +/// Extensions for filters +pub trait FilterExt { + /// Alias for [`Not::new`] + /// + /// ## Examples + /// ``` + /// use async_telegram_bot::dispatcher::filter::{Filter, FilterExt}; + /// + /// let flt = |i: &i32| -> bool { *i > 0 }; + /// let flt = flt.not(); + /// assert!(flt.test(&-1)); + /// assert_eq!(flt.test(&1), false); + /// ``` + /// + /// [`Not::new`]: crate::dispatcher::filter::Not::new + fn not(self) -> Not where Self: Sized { + Not::new(self) + } + + /// Alias for [`And::new`] + /// + /// ## Examples + /// ``` + /// use async_telegram_bot::dispatcher::filter::{Filter, FilterExt}; + /// + /// let flt = |i: &i32| -> bool { *i > 0 }; + /// let flt = flt.and(|i: &i32| *i < 42); + /// + /// assert!(flt.test(&1)); + /// assert_eq!(flt.test(&-1), false); + /// assert_eq!(flt.test(&43), false); + /// ``` + /// + /// [`Not::new`]: crate::dispatcher::filter::And::new + fn and(self, other: B) -> And where Self: Sized { + And::new(self, other) + } + + /// Alias for [`Or::new`] + /// + /// ## Examples + /// ``` + /// use async_telegram_bot::dispatcher::filter::{Filter, FilterExt}; + /// + /// let flt = |i: &i32| -> bool { *i < 0 }; + /// let flt = flt.or(|i: &i32| *i > 42); + /// + /// assert!(flt.test(&-1)); + /// assert!(flt.test(&43)); + /// assert_eq!(flt.test(&17), false); + /// ``` + /// + /// [`Not::new`]: crate::dispatcher::filter::Or::new + fn or(self, other: B) -> Or where Self: Sized { + Or::new(self, other) + } +} + +// All methods implemented via defaults +impl FilterExt for F where F: Filter {} From e95ffe13b743ed2c40e763f2304f3b58892f5a48 Mon Sep 17 00:00:00 2001 From: Waffle Date: Mon, 16 Sep 2019 21:19:46 +0300 Subject: [PATCH 04/15] Fix docs for `all!` and `any!` macroses --- src/dispatcher/filter.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dispatcher/filter.rs b/src/dispatcher/filter.rs index b783d31f..e1c267ad 100644 --- a/src/dispatcher/filter.rs +++ b/src/dispatcher/filter.rs @@ -180,7 +180,7 @@ pub fn not(a: A) -> Not { Not::new(a) } -/// Return [filter] that passes if and only if all given filters passes. +/// Return [filter] that passes if and only if all of the given filters passes. /// /// **NOTE**: if one of filters don't pass /// it is **not** guaranteed that other will be executed. @@ -211,7 +211,7 @@ macro_rules! all { }; } -/// Return [filter] that passes if and only if any given filters passes. +/// Return [filter] that passes if any of the given filters passes. /// /// **NOTE**: if one of filters passes /// it is **not** guaranteed that other will be executed. From 36f146580e42a122800bf68879841a4a65d6f2b3 Mon Sep 17 00:00:00 2001 From: Waffle Date: Sat, 21 Sep 2019 01:22:31 +0300 Subject: [PATCH 05/15] Fix build after resolving conflicts --- src/dispatcher/simple/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/dispatcher/simple/mod.rs b/src/dispatcher/simple/mod.rs index c3271a03..f3be25b5 100644 --- a/src/dispatcher/simple/mod.rs +++ b/src/dispatcher/simple/mod.rs @@ -5,7 +5,7 @@ use crate::{ filter::Filter, handler::Handler, }, - core::types::{ + types::{ Update, Message, UpdateKind, @@ -110,7 +110,7 @@ impl Dispatcher { where S: Stream { - use futures_util::stream::StreamExt; + use futures::StreamExt; let dp = &*self; @@ -202,7 +202,7 @@ mod tests { #[tokio::test] async fn test() { use crate::{ - core::types::{ + types::{ Message, ChatKind, MessageKind, Sender, ForwardKind, MediaKind, Chat, User, Update, UpdateKind }, dispatcher::simple::Dispatcher, @@ -221,7 +221,7 @@ mod tests { username: Some("WaffleLapkin".to_string()), }, }, - message_kind: MessageKind::IncomingMessage { + kind: MessageKind::Common { from: Sender::User(User { id: 457569668, is_bot: true, From 1ca7e7e226fa6614b7bed3cde28ee30be3ae3ed5 Mon Sep 17 00:00:00 2001 From: Waffle Date: Fri, 27 Sep 2019 15:26:52 +0300 Subject: [PATCH 06/15] Add `Bot::get_updates` method --- Cargo.toml | 1 + src/bot/api.rs | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index eae80686..197274cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,4 @@ tokio = "0.2.0-alpha.4" bytes = "0.4.12" log = "0.4.8" futures-preview = "0.3.0-alpha.18" +pin-project = "0.4.0-alpha.7" diff --git a/src/bot/api.rs b/src/bot/api.rs index 37efa01f..e3f963ee 100644 --- a/src/bot/api.rs +++ b/src/bot/api.rs @@ -1,3 +1,4 @@ +use crate::requests::get_updates::GetUpdates; use crate::{ bot::Bot, requests::{ @@ -17,6 +18,10 @@ impl Bot { GetMe::new(self.ctx()) } + pub fn get_updates(&self) -> GetUpdates { + GetUpdates::new(self.ctx()) + } + pub fn send_message(&self, chat_id: C, text: T) -> SendMessage where C: Into, From eb090ab2afc3a69f60e7b830324d48f9d1ad6f3a Mon Sep 17 00:00:00 2001 From: Waffle Date: Fri, 27 Sep 2019 15:41:47 +0300 Subject: [PATCH 07/15] Fix `Update` deserialization + add test --- src/types/update.rs | 75 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 73 insertions(+), 2 deletions(-) diff --git a/src/types/update.rs b/src/types/update.rs index df9d2a4b..c6282297 100644 --- a/src/types/update.rs +++ b/src/types/update.rs @@ -9,7 +9,6 @@ pub struct Update { } #[derive(Debug, Deserialize, PartialEq, Clone)] -#[serde(tag = "type")] #[serde(rename_all = "snake_case")] pub enum UpdateKind { Message(Message), @@ -21,4 +20,76 @@ pub enum UpdateKind { CallbackQuery(CallbackQuery), } -// TODO: tests for deserialization +#[cfg(test)] +mod test { + use crate::types::{ + Chat, ChatKind, ForwardKind, MediaKind, Message, MessageKind, Sender, + Update, UpdateKind, User, + }; + + // TODO: more tests for deserialization + #[test] + fn message() { + let json = r#"{ + "update_id":892252934, + "message":{ + "message_id":6557, + "from":{ + "id":218485655, + "is_bot": false, + "first_name":"Waffle", + "username":"WaffleLapkin", + "language_code":"en" + }, + "chat":{ + "id":218485655, + "first_name":"Waffle", + "username":"WaffleLapkin", + "type":"private" + }, + "date":1569518342, + "text":"hello there" + } + }"#; + + let expected: Update = Update { + id: 892252934, + kind: UpdateKind::Message(Message { + id: 6557, + date: 1569518342, + chat: Chat { + id: 218485655, + kind: ChatKind::Private { + type_: (), + username: Some(String::from("WaffleLapkin")), + first_name: Some(String::from("Waffle")), + last_name: None, + }, + photo: None, + }, + kind: MessageKind::Common { + from: Sender::User(User { + id: 218485655, + is_bot: false, + first_name: String::from("Waffle"), + last_name: None, + username: Some(String::from("WaffleLapkin")), + language_code: Some(String::from("en")), + }), + forward_kind: ForwardKind::Origin { + reply_to_message: None, + }, + edit_date: None, + media_kind: MediaKind::Text { + text: String::from("hello there"), + entities: vec![], + }, + reply_markup: None, + }, + }), + }; + + let actual = serde_json::from_str::(json).unwrap(); + assert_eq!(expected, actual); + } +} From f4a3c4f6bdd483574b15ce08c9b6f123dee78334 Mon Sep 17 00:00:00 2001 From: Waffle Date: Fri, 27 Sep 2019 15:51:22 +0300 Subject: [PATCH 08/15] Implement polling... --- src/dispatcher/handler.rs | 13 ++- src/dispatcher/mod.rs | 2 +- src/dispatcher/simple/mod.rs | 177 ++++++++++++++--------------------- src/dispatcher/updater.rs | 62 ++++++++++++ 4 files changed, 138 insertions(+), 116 deletions(-) create mode 100644 src/dispatcher/updater.rs diff --git a/src/dispatcher/handler.rs b/src/dispatcher/handler.rs index b36f3e93..c17dcc9d 100644 --- a/src/dispatcher/handler.rs +++ b/src/dispatcher/handler.rs @@ -1,18 +1,17 @@ -use std::pin::Pin; use std::future::Future; - +use std::pin::Pin; /// Asynchronous handler for event `I` (like `&self, I -> Future` fn) -pub trait Handler { - fn handle(&self, value: I) -> Pin + 'static>>; +pub trait Handler<'a, I> { + fn handle(&self, value: I) -> Pin + 'a>>; } -impl Handler for F +impl<'a, Fut, T, F> Handler<'a, T> for F where - Fut: Future + 'static, + Fut: Future + 'a, F: Fn(T) -> Fut, { - fn handle(&self, value: T) -> Pin + 'static>> { + fn handle(&self, value: T) -> Pin + 'a>> { Box::pin((self)(value)) } } diff --git a/src/dispatcher/mod.rs b/src/dispatcher/mod.rs index f4f068f0..6805dd8f 100644 --- a/src/dispatcher/mod.rs +++ b/src/dispatcher/mod.rs @@ -1,8 +1,8 @@ /// POC implementation of update dispatching - pub mod filter; pub mod handler; pub mod simple; +pub mod updater; pub use filter::Filter; pub use handler::Handler; diff --git a/src/dispatcher/simple/mod.rs b/src/dispatcher/simple/mod.rs index f3be25b5..46f54559 100644 --- a/src/dispatcher/simple/mod.rs +++ b/src/dispatcher/simple/mod.rs @@ -1,9 +1,8 @@ -use std::{future::Future, pin::Pin}; - use crate::{ dispatcher::{ filter::Filter, handler::Handler, + updater::Updater, }, types::{ Update, @@ -14,22 +13,22 @@ use crate::{ }, }; -use tokio::stream::Stream; +use futures::StreamExt; -pub type Handlers = Vec<(Box>, Box>)>; +pub type Handlers<'a, T> = Vec<(Box + 'a>, Box + 'a>)>; -pub struct Dispatcher { - message_handlers: Handlers, - edited_message_handlers: Handlers, - channel_post_handlers: Handlers, - edited_channel_post_handlers: Handlers, - inline_query_handlers: Handlers<()>, - chosen_inline_result_handlers: Handlers, - callback_query_handlers: Handlers, +pub struct Dispatcher<'a> { + message_handlers: Handlers<'a, Message>, + edited_message_handlers: Handlers<'a, Message>, + channel_post_handlers: Handlers<'a, Message>, + edited_channel_post_handlers: Handlers<'a, Message>, + inline_query_handlers: Handlers<'a, ()>, + chosen_inline_result_handlers: Handlers<'a, ChosenInlineResult>, + callback_query_handlers: Handlers<'a, CallbackQuery>, } -impl Dispatcher { +impl<'a> Dispatcher<'a> { pub fn new() -> Self { Dispatcher { message_handlers: Vec::new(), @@ -44,8 +43,8 @@ impl Dispatcher { pub fn message_handler(mut self, filter: F, handler: H) -> Self where - F: Filter + 'static, - H: Handler + 'static, + F: Filter + 'a, + H: Handler<'a, Message> + 'a, { self.message_handlers.push((Box::new(filter), Box::new(handler))); self @@ -53,8 +52,8 @@ impl Dispatcher { pub fn edited_message_handler(mut self, filter: F, handler: H) -> Self where - F: Filter + 'static, - H: Handler + 'static, + F: Filter + 'a, + H: Handler<'a, Message> + 'a, { self.edited_message_handlers.push((Box::new(filter), Box::new(handler))); self @@ -62,8 +61,8 @@ impl Dispatcher { pub fn channel_post_handler(mut self, filter: F, handler: H) -> Self where - F: Filter + 'static, - H: Handler + 'static, + F: Filter + 'a, + H: Handler<'a, Message> + 'a, { self.channel_post_handlers.push((Box::new(filter), Box::new(handler))); self @@ -71,8 +70,8 @@ impl Dispatcher { pub fn edited_channel_post_handler(mut self, filter: F, handler: H) -> Self where - F: Filter + 'static, - H: Handler + 'static, + F: Filter + 'a, + H: Handler<'a, Message> + 'a, { self.edited_channel_post_handlers.push((Box::new(filter), Box::new(handler))); self @@ -80,8 +79,8 @@ impl Dispatcher { pub fn inline_query_handler(mut self, filter: F, handler: H) -> Self where - F: Filter<()> + 'static, - H: Handler<()> + 'static, + F: Filter<()> + 'a, + H: Handler<'a, ()> + 'a, { self.inline_query_handlers.push((Box::new(filter), Box::new(handler))); self @@ -89,8 +88,8 @@ impl Dispatcher { pub fn chosen_inline_result_handler(mut self, filter: F, handler: H) -> Self where - F: Filter + 'static, - H: Handler + 'static, + F: Filter + 'a, + H: Handler<'a, ChosenInlineResult> + 'a, { self.chosen_inline_result_handlers.push((Box::new(filter), Box::new(handler))); self @@ -98,114 +97,73 @@ impl Dispatcher { pub fn callback_query_handler(mut self, filter: F, handler: H) -> Self where - F: Filter + 'static, - H: Handler + 'static, + F: Filter + 'a, + H: Handler<'a, CallbackQuery> + 'a, { self.callback_query_handlers.push((Box::new(filter), Box::new(handler))); self } // TODO: Can someone simplify this? - pub async fn dispatch(&mut self, updates: S) + pub async fn dispatch(&mut self, updates: U) where - S: Stream + U: Updater + 'a { - use futures::StreamExt; + updates.for_each(|res| { + async { + let res = res; + let Update { kind, id } = match res { + Ok(upd) => upd, + _ => return // TODO: proper error handling + }; - let dp = &*self; + log::debug!("Handled update#{id:?}: {kind:?}", id = id, kind = kind); - updates.for_each(|Update { id, kind }| async move { - log::debug!("Handled update#{id:?}: {kind:?}", id = id, kind = kind); + // TODO: can someone extract this to a function? + macro_rules! call { + ($h:expr, $value:expr) => {{ + let value = $value; + let handler = $h.iter().find_map(|e| { + let (filter, handler) = e; + if filter.test(&value) { + Some(handler) + } else { + None + } + }); - match kind { - UpdateKind::Message(mes) => { - call_handler( - find_handler(&dp.message_handlers, &mes), - mes - ) - .await; - }, - UpdateKind::EditedMessage(mes) => { - call_handler( - find_handler(&dp.edited_message_handlers, &mes), - mes - ) - .await; - }, - UpdateKind::ChannelPost(post) => { - call_handler( - find_handler(&dp.channel_post_handlers, &post), - post - ) - .await; - }, - UpdateKind::EditedChannelPost(post) => { - call_handler( - find_handler(&dp.edited_channel_post_handlers, &post), - post - ) - .await; - }, - UpdateKind::InlineQuery(query) => { - call_handler( - find_handler(&dp.inline_query_handlers, &query), - query - ) - .await; - }, - UpdateKind::ChosenInlineResult(result) => { - call_handler( - find_handler(&dp.chosen_inline_result_handlers, &result), - result - ) - .await; - }, - UpdateKind::CallbackQuery(callback) => { - call_handler( - find_handler(&dp.callback_query_handlers, &callback), - callback - ) - .await; - }, + match handler { + Some(handler) => handler.handle(value).await, + None => log::warn!("Unhandled update: {:?}", value) + } + }}; + } + + match kind { + UpdateKind::Message(mes) => call!(self.message_handlers, mes), + UpdateKind::EditedMessage(mes) => call!(self.edited_message_handlers, mes), + UpdateKind::ChannelPost(post) => call!(self.channel_post_handlers, post), + UpdateKind::EditedChannelPost(post) => call!(self.edited_channel_post_handlers, post), + UpdateKind::InlineQuery(query) => call!(self.inline_query_handlers, query), + UpdateKind::ChosenInlineResult(result) => call!(self.chosen_inline_result_handlers, result), + UpdateKind::CallbackQuery(callback) => call!(self.callback_query_handlers, callback), + } } }) .await; } } -/// Helper function -fn find_handler<'a, T: std::fmt::Debug>(handlers: &'a Handlers, value: &T) -> Option<&'a Box>> { - let handler = handlers.iter().find_map(|e| { - let (filter, handler) = e; - if filter.test(value) { - Some(handler) - } else { - None - } - }); - - handler -} - -/// Helper function -async fn call_handler(handler: Option<&Box>>, value: T) { - match handler { - Some(handler) => handler.handle(value).await, - None => log::warn!("Unhandled update: {:?}", value) - } -} #[cfg(test)] mod tests { - use super::*; - #[tokio::test] async fn test() { use crate::{ types::{ Message, ChatKind, MessageKind, Sender, ForwardKind, MediaKind, Chat, User, Update, UpdateKind }, - dispatcher::simple::Dispatcher, + dispatcher::{simple::Dispatcher, updater::StreamUpdater}, }; let mes = Message { @@ -249,6 +207,9 @@ mod tests { let mut dp = Dispatcher::new() .message_handler(true, handler); - dp.dispatch(tokio::stream::iter(vec![Update { id: 0, kind: UpdateKind::Message(mes) }])).await; + use futures::future::ready; + use futures::stream; + + dp.dispatch(StreamUpdater::new(stream::once(ready(Result::<_, ()>::Ok(Update { id: 0, kind: UpdateKind::Message(mes) }))))).await; } } diff --git a/src/dispatcher/updater.rs b/src/dispatcher/updater.rs new file mode 100644 index 00000000..6574b738 --- /dev/null +++ b/src/dispatcher/updater.rs @@ -0,0 +1,62 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project::pin_project; +use futures::{Stream, StreamExt, stream}; + +use crate::{ + bot::Bot, + requests::Request, + types::Update, + RequestError, +}; + +// Currently just a placeholder, but I'll add here some methods +pub trait Updater: Stream> {} + +#[pin_project] +pub struct StreamUpdater { + #[pin] + stream: S +} + +impl StreamUpdater { + pub fn new(stream: S) -> Self { + Self { stream } + } +} + +impl Stream for StreamUpdater where S: Stream> { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().stream.poll_next(cx) + } +} + +impl Updater for StreamUpdater where S: Stream> {} + +pub fn polling<'a>(bot: &'a Bot) -> impl Updater + 'a/*StreamUpdater> + 'a>*/ { + let stream = stream::unfold((bot, 0), |(bot, mut offset)| async move { + // this match converts Result, _> -> Vec> + let updates = match bot.get_updates().offset(offset).send().await { + Ok(updates) => { + if let Some(upd) = updates.last() { + offset = upd.id + 1; + } + updates.into_iter().map(|u| Ok(u)).collect::>() + }, + Err(err) => vec![Err(err)] + }; + Some((stream::iter(updates), (bot, offset))) + }) + .flatten(); + + StreamUpdater { stream } +} + +// TODO implement webhook (this actually require webserver and probably we +// should add cargo feature that adds webhook) +//pub fn webhook<'a>(bot: &'a Bot, cfg: WebhookConfig) -> Updater> + 'a> {} From c51aa9728a09035348684887ec64e5c405e40e27 Mon Sep 17 00:00:00 2001 From: Waffle Date: Sat, 5 Oct 2019 00:28:40 +0300 Subject: [PATCH 09/15] Errors in handlers --- src/dispatcher/handler.rs | 41 +++++++++++++++++++++------- src/dispatcher/simple/mod.rs | 53 +++++++++++++++++++++--------------- 2 files changed, 62 insertions(+), 32 deletions(-) diff --git a/src/dispatcher/handler.rs b/src/dispatcher/handler.rs index c17dcc9d..7c52928b 100644 --- a/src/dispatcher/handler.rs +++ b/src/dispatcher/handler.rs @@ -1,17 +1,38 @@ +use futures::FutureExt; use std::future::Future; use std::pin::Pin; -/// Asynchronous handler for event `I` (like `&self, I -> Future` fn) -pub trait Handler<'a, I> { - fn handle(&self, value: I) -> Pin + 'a>>; +pub type HandlerResult = Result<(), E>; + +/// Asynchronous handler for event `T` (like `&self, I -> Future` fn) +pub trait Handler<'a, T, E> { + fn handle(&self, value: T) -> Pin> + 'a>>; } -impl<'a, Fut, T, F> Handler<'a, T> for F -where - Fut: Future + 'a, - F: Fn(T) -> Fut, -{ - fn handle(&self, value: T) -> Pin + 'a>> { - Box::pin((self)(value)) +pub trait IntoHandlerResult { + fn into_hr(self) -> HandlerResult; +} + +impl IntoHandlerResult for () { + fn into_hr(self) -> HandlerResult { + Ok(()) + } +} + +impl IntoHandlerResult for HandlerResult { + fn into_hr(self) -> HandlerResult { + self + } +} + +impl<'a, F, Fut, R, T, E> Handler<'a, T, E> for F +where + F: Fn(T) -> Fut, + Fut: Future + 'a, + R: IntoHandlerResult + 'a, + E: 'a, +{ + fn handle(&self, value: T) -> Pin> + 'a>> { + Box::pin(self(value).map(IntoHandlerResult::into_hr)) } } diff --git a/src/dispatcher/simple/mod.rs b/src/dispatcher/simple/mod.rs index 46f54559..7655d6af 100644 --- a/src/dispatcher/simple/mod.rs +++ b/src/dispatcher/simple/mod.rs @@ -16,19 +16,19 @@ use crate::{ use futures::StreamExt; -pub type Handlers<'a, T> = Vec<(Box + 'a>, Box + 'a>)>; +pub type Handlers<'a, T, E> = Vec<(Box + 'a>, Box + 'a>)>; -pub struct Dispatcher<'a> { - message_handlers: Handlers<'a, Message>, - edited_message_handlers: Handlers<'a, Message>, - channel_post_handlers: Handlers<'a, Message>, - edited_channel_post_handlers: Handlers<'a, Message>, - inline_query_handlers: Handlers<'a, ()>, - chosen_inline_result_handlers: Handlers<'a, ChosenInlineResult>, - callback_query_handlers: Handlers<'a, CallbackQuery>, +pub struct Dispatcher<'a, E> { + message_handlers: Handlers<'a, Message, E>, + edited_message_handlers: Handlers<'a, Message, E>, + channel_post_handlers: Handlers<'a, Message, E>, + edited_channel_post_handlers: Handlers<'a, Message, E>, + inline_query_handlers: Handlers<'a, (), E>, + chosen_inline_result_handlers: Handlers<'a, ChosenInlineResult, E>, + callback_query_handlers: Handlers<'a, CallbackQuery, E>, } -impl<'a> Dispatcher<'a> { +impl<'a, E> Dispatcher<'a, E> { pub fn new() -> Self { Dispatcher { message_handlers: Vec::new(), @@ -44,7 +44,7 @@ impl<'a> Dispatcher<'a> { pub fn message_handler(mut self, filter: F, handler: H) -> Self where F: Filter + 'a, - H: Handler<'a, Message> + 'a, + H: Handler<'a, Message, E> + 'a, { self.message_handlers.push((Box::new(filter), Box::new(handler))); self @@ -53,7 +53,7 @@ impl<'a> Dispatcher<'a> { pub fn edited_message_handler(mut self, filter: F, handler: H) -> Self where F: Filter + 'a, - H: Handler<'a, Message> + 'a, + H: Handler<'a, Message, E> + 'a, { self.edited_message_handlers.push((Box::new(filter), Box::new(handler))); self @@ -62,7 +62,7 @@ impl<'a> Dispatcher<'a> { pub fn channel_post_handler(mut self, filter: F, handler: H) -> Self where F: Filter + 'a, - H: Handler<'a, Message> + 'a, + H: Handler<'a, Message, E> + 'a, { self.channel_post_handlers.push((Box::new(filter), Box::new(handler))); self @@ -71,7 +71,7 @@ impl<'a> Dispatcher<'a> { pub fn edited_channel_post_handler(mut self, filter: F, handler: H) -> Self where F: Filter + 'a, - H: Handler<'a, Message> + 'a, + H: Handler<'a, Message, E> + 'a, { self.edited_channel_post_handlers.push((Box::new(filter), Box::new(handler))); self @@ -80,7 +80,7 @@ impl<'a> Dispatcher<'a> { pub fn inline_query_handler(mut self, filter: F, handler: H) -> Self where F: Filter<()> + 'a, - H: Handler<'a, ()> + 'a, + H: Handler<'a, (), E> + 'a, { self.inline_query_handlers.push((Box::new(filter), Box::new(handler))); self @@ -89,7 +89,7 @@ impl<'a> Dispatcher<'a> { pub fn chosen_inline_result_handler(mut self, filter: F, handler: H) -> Self where F: Filter + 'a, - H: Handler<'a, ChosenInlineResult> + 'a, + H: Handler<'a, ChosenInlineResult, E> + 'a, { self.chosen_inline_result_handlers.push((Box::new(filter), Box::new(handler))); self @@ -98,16 +98,16 @@ impl<'a> Dispatcher<'a> { pub fn callback_query_handler(mut self, filter: F, handler: H) -> Self where F: Filter + 'a, - H: Handler<'a, CallbackQuery> + 'a, + H: Handler<'a, CallbackQuery, E> + 'a, { self.callback_query_handlers.push((Box::new(filter), Box::new(handler))); self } // TODO: Can someone simplify this? - pub async fn dispatch(&mut self, updates: U) + pub async fn dispatch(&mut self, updates: U) where - U: Updater + 'a + U: Updater + 'a { updates.for_each(|res| { async { @@ -133,7 +133,7 @@ impl<'a> Dispatcher<'a> { }); match handler { - Some(handler) => handler.handle(value).await, + Some(handler) => { handler.handle(value).await; /* todo */ }, None => log::warn!("Unhandled update: {:?}", value) } }}; @@ -157,6 +157,8 @@ impl<'a> Dispatcher<'a> { #[cfg(test)] mod tests { + use std::convert::Infallible; + #[tokio::test] async fn test() { use crate::{ @@ -204,8 +206,15 @@ mod tests { println!("{:#?}", mes) } - let mut dp = Dispatcher::new() - .message_handler(true, handler); + async fn handler2(mes: Message) -> Result<(), Infallible>{ + println!("{:#?}", mes); + + Ok(()) + } + + let mut dp = Dispatcher::::new() + .message_handler(true, handler) + .message_handler(true, handler2); use futures::future::ready; use futures::stream; From 64111f4ae921fc3ab5273fd782a33adc8a30efe0 Mon Sep 17 00:00:00 2001 From: Waffle Date: Sat, 5 Oct 2019 00:54:53 +0300 Subject: [PATCH 10/15] Actual error handling (meh, we can do it better probably) --- src/dispatcher/simple/mod.rs | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/dispatcher/simple/mod.rs b/src/dispatcher/simple/mod.rs index 7655d6af..1e08c11f 100644 --- a/src/dispatcher/simple/mod.rs +++ b/src/dispatcher/simple/mod.rs @@ -14,6 +14,8 @@ use crate::{ }; use futures::StreamExt; +use std::pin::Pin; +use std::future::Future; pub type Handlers<'a, T, E> = Vec<(Box + 'a>, Box + 'a>)>; @@ -26,6 +28,7 @@ pub struct Dispatcher<'a, E> { inline_query_handlers: Handlers<'a, (), E>, chosen_inline_result_handlers: Handlers<'a, ChosenInlineResult, E>, callback_query_handlers: Handlers<'a, CallbackQuery, E>, + error_handler: Option Pin + 'a>>>>, } impl<'a, E> Dispatcher<'a, E> { @@ -37,7 +40,8 @@ impl<'a, E> Dispatcher<'a, E> { edited_channel_post_handlers: Vec::new(), inline_query_handlers: Vec::new(), chosen_inline_result_handlers: Vec::new(), - callback_query_handlers: Vec::new() + callback_query_handlers: Vec::new(), + error_handler: None, } } @@ -104,6 +108,16 @@ impl<'a, E> Dispatcher<'a, E> { self } + // TODO: rework error handling + pub fn set_error_handler(mut self, handler: H) -> Self + where + H: Fn(E) -> Fut + 'static, + Fut: Future + 'a, + { + self.error_handler = Some(Box::new(move |e| Box::pin(handler(e)))); + self + } + // TODO: Can someone simplify this? pub async fn dispatch(&mut self, updates: U) where @@ -133,7 +147,13 @@ impl<'a, E> Dispatcher<'a, E> { }); match handler { - Some(handler) => { handler.handle(value).await; /* todo */ }, + Some(handler) => { + if let Err(err) = handler.handle(value).await { + if let Some(h) = &self.error_handler { + h(err).await; + } + } + }, None => log::warn!("Unhandled update: {:?}", value) } }}; From 6bda45e2aa918bd5751277c57a76b2cc6b822d3b Mon Sep 17 00:00:00 2001 From: Waffle Date: Sat, 5 Oct 2019 01:19:32 +0300 Subject: [PATCH 11/15] Use 'error policy' to handle errors in dispatcher --- src/dispatcher/simple/error_policy.rs | 36 +++++++++++++++++++++++++++ src/dispatcher/simple/mod.rs | 32 ++++++++++-------------- 2 files changed, 49 insertions(+), 19 deletions(-) create mode 100644 src/dispatcher/simple/error_policy.rs diff --git a/src/dispatcher/simple/error_policy.rs b/src/dispatcher/simple/error_policy.rs new file mode 100644 index 00000000..d71b254b --- /dev/null +++ b/src/dispatcher/simple/error_policy.rs @@ -0,0 +1,36 @@ +use std::pin::Pin; +use std::future::Future; +use std::fmt::Debug; + +// TODO: shouldn't it be trait? +pub enum ErrorPolicy<'a, E> { + Ignore, + Log, + Custom(Box Pin + 'a>>>), +} + +impl<'a, E> ErrorPolicy<'a, E> +where + E: Debug, +{ + pub async fn handle_error(&self, error: E) { + match self { + Self::Ignore => {}, + Self::Log => { + // TODO: better message + log::error!("Error in handler: {:?}", error) + } + Self::Custom(func) => { + func(error).await + } + } + } + + pub fn custom(f: F) -> Self + where + F: Fn(E) -> Fut + 'static, + Fut: Future + 'a, + { + Self::Custom(Box::new(move |e| Box::pin(f(e)))) + } +} \ No newline at end of file diff --git a/src/dispatcher/simple/mod.rs b/src/dispatcher/simple/mod.rs index 1e08c11f..3961258b 100644 --- a/src/dispatcher/simple/mod.rs +++ b/src/dispatcher/simple/mod.rs @@ -1,3 +1,5 @@ +pub mod error_policy; + use crate::{ dispatcher::{ filter::Filter, @@ -16,6 +18,7 @@ use crate::{ use futures::StreamExt; use std::pin::Pin; use std::future::Future; +use crate::dispatcher::simple::error_policy::ErrorPolicy; pub type Handlers<'a, T, E> = Vec<(Box + 'a>, Box + 'a>)>; @@ -28,11 +31,14 @@ pub struct Dispatcher<'a, E> { inline_query_handlers: Handlers<'a, (), E>, chosen_inline_result_handlers: Handlers<'a, ChosenInlineResult, E>, callback_query_handlers: Handlers<'a, CallbackQuery, E>, - error_handler: Option Pin + 'a>>>>, + error_policy: ErrorPolicy<'a, E>, } -impl<'a, E> Dispatcher<'a, E> { - pub fn new() -> Self { +impl<'a, E> Dispatcher<'a, E> +where + E: std::fmt::Debug, // TODO: Is this really necessary? +{ + pub fn new(error_policy: ErrorPolicy<'a, E>) -> Self { Dispatcher { message_handlers: Vec::new(), edited_message_handlers: Vec::new(), @@ -41,7 +47,7 @@ impl<'a, E> Dispatcher<'a, E> { inline_query_handlers: Vec::new(), chosen_inline_result_handlers: Vec::new(), callback_query_handlers: Vec::new(), - error_handler: None, + error_policy } } @@ -108,16 +114,6 @@ impl<'a, E> Dispatcher<'a, E> { self } - // TODO: rework error handling - pub fn set_error_handler(mut self, handler: H) -> Self - where - H: Fn(E) -> Fut + 'static, - Fut: Future + 'a, - { - self.error_handler = Some(Box::new(move |e| Box::pin(handler(e)))); - self - } - // TODO: Can someone simplify this? pub async fn dispatch(&mut self, updates: U) where @@ -149,9 +145,7 @@ impl<'a, E> Dispatcher<'a, E> { match handler { Some(handler) => { if let Err(err) = handler.handle(value).await { - if let Some(h) = &self.error_handler { - h(err).await; - } + self.error_policy.handle_error(err).await; } }, None => log::warn!("Unhandled update: {:?}", value) @@ -185,7 +179,7 @@ mod tests { types::{ Message, ChatKind, MessageKind, Sender, ForwardKind, MediaKind, Chat, User, Update, UpdateKind }, - dispatcher::{simple::Dispatcher, updater::StreamUpdater}, + dispatcher::{simple::{Dispatcher, error_policy::ErrorPolicy}, updater::StreamUpdater}, }; let mes = Message { @@ -232,7 +226,7 @@ mod tests { Ok(()) } - let mut dp = Dispatcher::::new() + let mut dp = Dispatcher::::new(ErrorPolicy::Ignore) .message_handler(true, handler) .message_handler(true, handler2); From 4c00f48339eae03c07497a1924c4f010ece9e22a Mon Sep 17 00:00:00 2001 From: Waffle Date: Sun, 6 Oct 2019 11:19:02 +0300 Subject: [PATCH 12/15] Add doc comment for `dispatcher::simple::Dispatcher`, fix doc comment for `dispatcher` module --- src/dispatcher/mod.rs | 3 +- src/dispatcher/simple/mod.rs | 57 ++++++++++++++++++++++++++++++++++-- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/src/dispatcher/mod.rs b/src/dispatcher/mod.rs index 6805dd8f..a4d5c310 100644 --- a/src/dispatcher/mod.rs +++ b/src/dispatcher/mod.rs @@ -1,4 +1,5 @@ -/// POC implementation of update dispatching +//! POC implementation of update dispatching + pub mod filter; pub mod handler; pub mod simple; diff --git a/src/dispatcher/simple/mod.rs b/src/dispatcher/simple/mod.rs index 3961258b..79ec7bb9 100644 --- a/src/dispatcher/simple/mod.rs +++ b/src/dispatcher/simple/mod.rs @@ -16,13 +16,64 @@ use crate::{ }; use futures::StreamExt; -use std::pin::Pin; -use std::future::Future; use crate::dispatcher::simple::error_policy::ErrorPolicy; -pub type Handlers<'a, T, E> = Vec<(Box + 'a>, Box + 'a>)>; +type Handlers<'a, T, E> = Vec<(Box + 'a>, Box + 'a>)>; +/// Dispatcher that dispatches updates from telegram. +/// +/// This is 'simple' implementation with following limitations: +/// - Error (`E` generic parameter) _must_ implement [`std::fmt::Debug`] +/// - All 'handlers' are boxed +/// - Handler's fututres are also boxed +/// - [Custom error policy] is also boxed +/// - All errors from [updater] are ignored (TODO: remove this limitation) +/// - All handlers executed in order (this means that in dispatcher have +/// 2 upadtes it will first execute some handler into complition with +/// first update and **then** search for handler for second update, +/// this is probably wrong) +/// +/// ## Examples +/// +/// Simplest example: +/// ```no_run +/// # async fn run() { +/// use std::convert::Infallible; +/// use async_telegram_bot::{ +/// bot::Bot, +/// types::Message, +/// dispatcher::{ +/// updater::polling, +/// simple::{Dispatcher, error_policy::ErrorPolicy}, +/// }, +/// }; +/// +/// async fn handle_edited_message(mes: Message) { +/// println!("Edited message: {:?}", mes) +/// } +/// +/// let bot = Bot::new("TOKEN"); +/// +/// // create dispatcher which handlers can't fail +/// // with error policy that just ignores all errors (that can't ever happen) +/// let mut dp = Dispatcher::::new(ErrorPolicy::Ignore) +/// // Add 'handler' that will handle all messages sent to the bot +/// .message_handler(true, |mes: Message| async move { +/// println!("New message: {:?}", mes) +/// }) +/// // Add 'handler' that will handle all +/// // messages edited in chat with the bot +/// .edited_message_handler(true, edit); +/// +/// // Start dispatching updates from long polling +/// dp.dispatch(polling(&bot)).await; +/// # } +/// ``` +/// +/// [`std::fmt::Debug`]: std::fmt::Debug +/// [Custom error policy]: crate::dispatcher::simple::error_policy::ErrorPolicy::Custom +/// [updater]: crate::dispatcher::updater pub struct Dispatcher<'a, E> { message_handlers: Handlers<'a, Message, E>, edited_message_handlers: Handlers<'a, Message, E>, From 4c478b1f77a22451ff9a4bc8fc376e1ddec26f01 Mon Sep 17 00:00:00 2001 From: Waffle Date: Sun, 6 Oct 2019 11:19:48 +0300 Subject: [PATCH 13/15] Rewrite simple dispatcher's test for a bit --- src/dispatcher/simple/mod.rs | 59 ++++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/src/dispatcher/simple/mod.rs b/src/dispatcher/simple/mod.rs index 79ec7bb9..87d8575f 100644 --- a/src/dispatcher/simple/mod.rs +++ b/src/dispatcher/simple/mod.rs @@ -223,17 +223,38 @@ where #[cfg(test)] mod tests { use std::convert::Infallible; + use std::sync::atomic::{AtomicI32, Ordering}; + + use crate::{ + types::{ + Message, ChatKind, MessageKind, Sender, ForwardKind, MediaKind, Chat, User, Update, UpdateKind + }, + dispatcher::{simple::{Dispatcher, error_policy::ErrorPolicy}, updater::StreamUpdater}, + }; + use futures::Stream; #[tokio::test] - async fn test() { - use crate::{ - types::{ - Message, ChatKind, MessageKind, Sender, ForwardKind, MediaKind, Chat, User, Update, UpdateKind - }, - dispatcher::{simple::{Dispatcher, error_policy::ErrorPolicy}, updater::StreamUpdater}, - }; + async fn first_handler_executes_1_time() { + let counter = &AtomicI32::new(0); + let counter2 = &AtomicI32::new(0); - let mes = Message { + let mut dp = Dispatcher::::new(ErrorPolicy::Ignore) + .message_handler(true, |_mes: Message| async move { + counter.fetch_add(1, Ordering::SeqCst); + }) + .message_handler(true, |_mes: Message| async move { + counter2.fetch_add(1, Ordering::SeqCst); + Ok::<_, Infallible>(()) + }); + + dp.dispatch(one_message_updater()).await; + + assert_eq!(counter.load(Ordering::SeqCst), 1); + assert_eq!(counter2.load(Ordering::SeqCst), 0); + } + + fn message() -> Message { + Message { id: 6534, date: 1567898953, chat: Chat { @@ -265,25 +286,19 @@ mod tests { }, reply_markup: None, }, - }; - - async fn handler(mes: Message) { - println!("{:#?}", mes) } + } - async fn handler2(mes: Message) -> Result<(), Infallible>{ - println!("{:#?}", mes); - - Ok(()) - } - - let mut dp = Dispatcher::::new(ErrorPolicy::Ignore) - .message_handler(true, handler) - .message_handler(true, handler2); + fn message_update() -> Update { + Update { id: 0, kind: UpdateKind::Message(message()) } + } + fn one_message_updater() -> StreamUpdater>> { use futures::future::ready; use futures::stream; - dp.dispatch(StreamUpdater::new(stream::once(ready(Result::<_, ()>::Ok(Update { id: 0, kind: UpdateKind::Message(mes) }))))).await; + StreamUpdater::new( + stream::once(ready(Ok(message_update()))) + ) } } From 553b1666a6a2c59162a03987abc0b95986f19a70 Mon Sep 17 00:00:00 2001 From: Waffle Date: Sun, 6 Oct 2019 11:21:18 +0300 Subject: [PATCH 14/15] Fyx typo --- src/dispatcher/simple/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dispatcher/simple/mod.rs b/src/dispatcher/simple/mod.rs index 87d8575f..63ce3a28 100644 --- a/src/dispatcher/simple/mod.rs +++ b/src/dispatcher/simple/mod.rs @@ -64,7 +64,7 @@ type Handlers<'a, T, E> = Vec<(Box + 'a>, Box Date: Wed, 9 Oct 2019 21:17:45 +0300 Subject: [PATCH 15/15] Add doc comment for updater trait --- src/bot/api.rs | 3 +- src/dispatcher/updater.rs | 89 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 88 insertions(+), 4 deletions(-) diff --git a/src/bot/api.rs b/src/bot/api.rs index 52d8a409..0472d84d 100644 --- a/src/bot/api.rs +++ b/src/bot/api.rs @@ -1,10 +1,9 @@ -use crate::requests::get_updates::GetUpdates; use crate::{ bot::Bot, requests::{ ChatId, EditMessageLiveLocation, ForwardMessage, GetFile, GetMe, SendAudio, SendLocation, SendMediaGroup, SendMessage, SendPhoto, - StopMessageLiveLocation, + StopMessageLiveLocation, GetUpdates }, types::{InputFile, InputMedia}, }; diff --git a/src/dispatcher/updater.rs b/src/dispatcher/updater.rs index 6574b738..8dd14da1 100644 --- a/src/dispatcher/updater.rs +++ b/src/dispatcher/updater.rs @@ -8,12 +8,97 @@ use futures::{Stream, StreamExt, stream}; use crate::{ bot::Bot, - requests::Request, types::Update, RequestError, }; // Currently just a placeholder, but I'll add here some methods +/// Updater is stream of updates. +/// +/// Telegram supports 2 ways of [getting updates]: [long polling](Long Polling) and webhook +/// +/// ## Long Polling +/// +/// In long polling ([wiki]) you just call [GetUpdates] every N seconds. +/// +/// #### Example: +/// +/// +/// +/// ^1 Timeout can be even 0 +/// (this is also called short polling), +/// but you should use it **only** for testing purposes +/// +/// ^2 Large delays will cause in bot lags, +/// so delay shouldn't exceed second. +/// +/// ^3 Note that if telegram already have updates for +/// you it will answer you **without** waiting for a timeout +/// +/// ^4 `offset = N` means that we've already received +/// updates `0..=N` +/// +/// [GetUpdates]: crate::requests::GetUpdates +/// [getting updates]: https://core.telegram.org/bots/api#getting-updates +/// [wiki]: https://en.wikipedia.org/wiki/Push_technology#Long_polling pub trait Updater: Stream> {} #[pin_project] @@ -38,7 +123,7 @@ impl Stream for StreamUpdater where S: Stream> { impl Updater for StreamUpdater where S: Stream> {} -pub fn polling<'a>(bot: &'a Bot) -> impl Updater + 'a/*StreamUpdater> + 'a>*/ { +pub fn polling<'a>(bot: &'a Bot) -> impl Updater + 'a { let stream = stream::unfold((bot, 0), |(bot, mut offset)| async move { // this match converts Result, _> -> Vec> let updates = match bot.get_updates().offset(offset).send().await {
+///     tg                           bot
+///      |                            |
+///      |<---------------------------| Updates? (GetUpdates call)
+///      ↑                            ↑
+///      |          timeout^1         |
+///      ↓                            ↓
+/// Nope |--------------------------->|
+///      ↑                            ↑
+///      | delay between GetUpdates^2 |
+///      ↓                            ↓
+///      |<---------------------------| Updates?
+///      ↑                            ↑
+///      |          timeout^3         |
+///      ↓                            ↓
+/// Yes  |-------[updates 0, 1]------>|
+///      ↑                            ↑
+///      |           delay            |
+///      ↓                            ↓
+///      |<-------[offset = 1]--------| Updates?^4
+///      ↑                            ↑
+///      |           timeout          |
+///      ↓                            ↓
+/// Yes  |---------[update 2]-------->|
+///      ↑                            ↑
+///      |           delay            |
+///      ↓                            ↓
+///      |<-------[offset = 2]--------| Updates?
+///      ↑                            ↑
+///      |           timeout          |
+///      ↓                            ↓
+/// Nope |--------------------------->|
+///      ↑                            ↑
+///      |           delay            |
+///      ↓                            ↓
+///      |<-------[offset = 2]--------| Updates?
+///      ↑                            ↑
+///      |           timeout          |
+///      ↓                            ↓
+/// Nope |--------------------------->|
+///      ↑                            ↑
+///      |           delay            |
+///      ↓                            ↓
+///      |<-------[offset = 2]--------| Updates?
+///      ↑                            ↑
+///      |           timeout          |
+///      ↓                            ↓
+/// Yes  |-------[updates 2..5]------>|
+///      ↑                            ↑
+///      |           delay            |
+///      ↓                            ↓
+///      |<-------[offset = 5]--------| Updates?
+///      ↑                            ↑
+///      |           timeout          |
+///      ↓                            ↓
+/// Nope |--------------------------->|
+///      |                            |
+///      ~    and so on, and so on    ~
+///