diff --git a/Cargo.toml b/Cargo.toml index ab47822f..6c7b54f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,8 @@ serde = { version = "1.0.101", features = ["derive"] } derive_more = "0.15.0" tokio = "0.2.0-alpha.6" bytes = "0.4.12" +log = "0.4.8" +pin-project = "0.4.0-alpha.7" futures-preview = "0.3.0-alpha.19" async-trait = "0.1.13" thiserror = "1.0.2" diff --git a/src/bot/api.rs b/src/bot/api.rs index 3c34f8af..a75328c1 100644 --- a/src/bot/api.rs +++ b/src/bot/api.rs @@ -6,7 +6,7 @@ use crate::{ PromoteChatMember, RestrictChatMember, SendAudio, SendChatAction, SendContact, SendLocation, SendMediaGroup, SendMessage, SendPhoto, SendPoll, SendVenue, SendVideoNote, SendVoice, StopMessageLiveLocation, - UnbanChatMember, UnpinChatMessage, + UnbanChatMember, UnpinChatMessage, GetUpdates }, types::{ChatAction, ChatId, ChatPermissions, InputFile, InputMedia}, }; @@ -17,6 +17,10 @@ impl Bot { GetMe::new(self.ctx()) } + pub fn get_updates(&self) -> GetUpdates { + GetUpdates::new(self.ctx()) + } + pub fn send_message(&self, chat_id: C, text: T) -> SendMessage where C: Into, diff --git a/src/dispatcher/filter.rs b/src/dispatcher/filter.rs new file mode 100644 index 00000000..e1c267ad --- /dev/null +++ b/src/dispatcher/filter.rs @@ -0,0 +1,372 @@ +/// Filter that determines that particular event +/// is suitable for particular handler. +pub trait Filter { + /// Passes (return true) if event is suitable (otherwise return false) + fn test(&self, value: &T) -> bool; +} + +/// ``` +/// use async_telegram_bot::dispatcher::filter::Filter; +/// +/// let closure = |i: &i32| -> bool { *i >= 42 }; +/// assert!(closure.test(&42)); +/// assert!(closure.test(&100)); +/// +/// assert_eq!(closure.test(&41), false); +/// assert_eq!(closure.test(&0), false); +/// ``` +impl bool> Filter for F { + fn test(&self, value: &T) -> bool { + (self)(value) + } +} + +/// ``` +/// use async_telegram_bot::dispatcher::filter::Filter; +/// +/// assert!(true.test(&())); +/// assert_eq!(false.test(&()), false); +/// ``` +impl Filter for bool { + fn test(&self, _: &T) -> bool { *self } +} + +/// And filter. +/// +/// Passes if both underlying filters pass. +/// +/// **NOTE**: if one of filters don't pass +/// it is **not** guaranteed that other will be executed. +/// +/// ## Examples +/// ``` +/// use async_telegram_bot::dispatcher::filter::{And, Filter}; +/// +/// // Note: bool can be treated as `Filter` that always return self. +/// assert_eq!(And::new(true, false).test(&()), false); +/// assert_eq!(And::new(true, false).test(&()), false); +/// assert!(And::new(true, true).test(&())); +/// assert!(And::new(true, And::new(|_: &()| true, true)).test(&())); +/// ``` +#[derive(Debug, Clone, Copy)] +pub struct And(A, B); + +impl And { + pub fn new(a: A, b: B) -> Self { + And(a, b) + } +} + +impl Filter for And +where + A: Filter, + B: Filter, +{ + fn test(&self, value: &T) -> bool { + self.0.test(value) && self.1.test(value) + } +} + +/// Alias for [`And::new`] +/// +/// ## Examples +/// ``` +/// use async_telegram_bot::dispatcher::filter::{and, Filter}; +/// +/// assert!(and(true, true).test(&())); +/// assert_eq!(and(true, false).test(&()), false); +/// ``` +/// +/// [`And::new`]: crate::dispatcher::filter::And::new +pub fn and(a: A, b: B) -> And { + And::new(a, b) +} + + +/// Or filter. +/// +/// Passes if at least one underlying filters passes. +/// +/// **NOTE**: if one of filters passes +/// it is **not** guaranteed that other will be executed. +/// +/// ## Examples +/// ``` +/// use async_telegram_bot::dispatcher::filter::{Or, Filter}; +/// +/// // Note: bool can be treated as `Filter` that always return self. +/// assert!(Or::new(true, false).test(&())); +/// assert!(Or::new(false, true).test(&())); +/// assert!(Or::new(false, Or::new(|_: &()| true, false)).test(&())); +/// assert_eq!(Or::new(false, false).test(&()), false); +/// ``` +#[derive(Debug, Clone, Copy)] +pub struct Or(A, B); + +impl Or { + pub fn new(a: A, b: B) -> Self { + Or(a, b) + } +} + +impl Filter for Or +where + A: Filter, + B: Filter, +{ + fn test(&self, value: &T) -> bool { + self.0.test(value) || self.1.test(value) + } +} + +/// Alias for [`Or::new`] +/// +/// ## Examples +/// ``` +/// use async_telegram_bot::dispatcher::filter::{or, Filter}; +/// +/// assert!(or(true, false).test(&())); +/// assert_eq!(or(false, false).test(&()), false); +/// ``` +/// +/// [`Or::new`]: crate::dispatcher::filter::Or::new +pub fn or(a: A, b: B) -> Or { + Or::new(a, b) +} + + +/// Not filter. +/// +/// Passes if underlying filter don't pass. +/// +/// ## Examples +/// ``` +/// use async_telegram_bot::dispatcher::filter::{Not, Filter}; +/// +/// // Note: bool can be treated as `Filter` that always return self. +/// assert!(Not::new(false).test(&())); +/// assert_eq!(Not::new(true).test(&()), false); +/// ``` +#[derive(Debug, Clone, Copy)] +pub struct Not(A); + +impl Not { + pub fn new(a: A) -> Self { + Not(a) + } +} + +impl Filter for Not +where + A: Filter, +{ + fn test(&self, value: &T) -> bool { + !self.0.test(value) + } +} + +/// Alias for [`Not::new`] +/// +/// ## Examples +/// ``` +/// use async_telegram_bot::dispatcher::filter::{not, Filter}; +/// +/// assert!(not(false).test(&())); +/// assert_eq!(not(true).test(&()), false); +/// ``` +/// +/// [`Not::new`]: crate::dispatcher::filter::Not::new +pub fn not(a: A) -> Not { + Not::new(a) +} + +/// Return [filter] that passes if and only if all of the given filters passes. +/// +/// **NOTE**: if one of filters don't pass +/// it is **not** guaranteed that other will be executed. +/// +/// ## Examples +/// ``` +/// use async_telegram_bot::{all, dispatcher::filter::Filter}; +/// +/// assert!(all![true].test(&())); +/// assert!(all![true, true].test(&())); +/// assert!(all![true, true, true].test(&())); +/// +/// assert_eq!(all![false].test(&()), false); +/// assert_eq!(all![true, false].test(&()), false); +/// assert_eq!(all![false, true].test(&()), false); +/// assert_eq!(all![false, false].test(&()), false); +/// ``` +/// +/// [filter]: crate::dispatcher::filter::Filter +#[macro_export] +macro_rules! all { + ($one:expr) => { $one }; + ($head:expr, $($tail:tt)+) => { + $crate::dispatcher::filter::And::new( + $head, + $crate::all!($($tail)+) + ) + }; +} + +/// Return [filter] that passes if any of the given filters passes. +/// +/// **NOTE**: if one of filters passes +/// it is **not** guaranteed that other will be executed. +/// +/// ## Examples +/// ``` +/// use async_telegram_bot::{any, dispatcher::filter::Filter}; +/// +/// assert!(any![true].test(&())); +/// assert!(any![true, true].test(&())); +/// assert!(any![false, true].test(&())); +/// assert!(any![true, false, true].test(&())); +/// +/// assert_eq!(any![false].test(&()), false); +/// assert_eq!(any![false, false].test(&()), false); +/// assert_eq!(any![false, false, false].test(&()), false); +/// ``` +/// +/// [filter]: crate::dispatcher::filter::Filter +#[macro_export] +macro_rules! any { + ($one:expr) => { $one }; + ($head:expr, $($tail:tt)+) => { + $crate::dispatcher::filter::Or::new( + $head, + $crate::all!($($tail)+) + ) + }; +} + + +/// Simple wrapper around `Filter` that adds `|` and `&` operators. +/// +/// ## Examples +/// ``` +/// use async_telegram_bot::dispatcher::filter::{Filter, f, F, And, Or}; +/// +/// let flt1 = |i: &i32| -> bool { *i > 17 }; +/// let flt2 = |i: &i32| -> bool { *i < 42 }; +/// let flt3 = |i: &i32| -> bool { *i % 2 == 0 }; +/// +/// let and = f(flt1) & flt2; +/// assert!(and.test(&19)); // both filters pass +/// +/// assert_eq!(and.test(&50), false); // `flt2` doesn't pass +/// assert_eq!(and.test(&16), false); // `flt1` doesn't pass +/// +/// +/// let or = f(flt1) | flt3; +/// assert!(or.test(&19)); // `flt1` passes +/// assert!(or.test(&16)); // `flt2` passes +/// assert!(or.test(&20)); // both pass +/// +/// assert_eq!(or.test(&17), false); // both don't pass +/// +/// +/// // Note: only first filter in chain should be wrapped in `f(...)` +/// let complicated: F, _>>= f(flt1) & flt2 | flt3; +/// assert!(complicated.test(&2)); // `flt3` passes +/// assert!(complicated.test(&21)); // `flt1` and `flt2` pass +/// +/// assert_eq!(complicated.test(&15), false); // `flt1` and `flt3` don't pass +/// assert_eq!(complicated.test(&43), false); // `flt2` and `flt3` don't pass +/// ``` +pub struct F(A); + +/// Constructor fn for [F] +/// +/// [F]: crate::dispatcher::filter::F; +pub fn f(a: A) -> F { + F(a) +} + +impl Filter for F +where + A: Filter +{ + fn test(&self, value: &T) -> bool { + self.0.test(value) + } +} + +impl std::ops::BitAnd for F { + type Output = F>; + + fn bitand(self, other: B) -> Self::Output { + f(and(self.0, other)) + } +} + +impl std::ops::BitOr for F { + type Output = F>; + + fn bitor(self, other: B) -> Self::Output { + f(or(self.0, other)) + } +} + +/// Extensions for filters +pub trait FilterExt { + /// Alias for [`Not::new`] + /// + /// ## Examples + /// ``` + /// use async_telegram_bot::dispatcher::filter::{Filter, FilterExt}; + /// + /// let flt = |i: &i32| -> bool { *i > 0 }; + /// let flt = flt.not(); + /// assert!(flt.test(&-1)); + /// assert_eq!(flt.test(&1), false); + /// ``` + /// + /// [`Not::new`]: crate::dispatcher::filter::Not::new + fn not(self) -> Not where Self: Sized { + Not::new(self) + } + + /// Alias for [`And::new`] + /// + /// ## Examples + /// ``` + /// use async_telegram_bot::dispatcher::filter::{Filter, FilterExt}; + /// + /// let flt = |i: &i32| -> bool { *i > 0 }; + /// let flt = flt.and(|i: &i32| *i < 42); + /// + /// assert!(flt.test(&1)); + /// assert_eq!(flt.test(&-1), false); + /// assert_eq!(flt.test(&43), false); + /// ``` + /// + /// [`Not::new`]: crate::dispatcher::filter::And::new + fn and(self, other: B) -> And where Self: Sized { + And::new(self, other) + } + + /// Alias for [`Or::new`] + /// + /// ## Examples + /// ``` + /// use async_telegram_bot::dispatcher::filter::{Filter, FilterExt}; + /// + /// let flt = |i: &i32| -> bool { *i < 0 }; + /// let flt = flt.or(|i: &i32| *i > 42); + /// + /// assert!(flt.test(&-1)); + /// assert!(flt.test(&43)); + /// assert_eq!(flt.test(&17), false); + /// ``` + /// + /// [`Not::new`]: crate::dispatcher::filter::Or::new + fn or(self, other: B) -> Or where Self: Sized { + Or::new(self, other) + } +} + +// All methods implemented via defaults +impl FilterExt for F where F: Filter {} diff --git a/src/dispatcher/handler.rs b/src/dispatcher/handler.rs new file mode 100644 index 00000000..7c52928b --- /dev/null +++ b/src/dispatcher/handler.rs @@ -0,0 +1,38 @@ +use futures::FutureExt; +use std::future::Future; +use std::pin::Pin; + +pub type HandlerResult = Result<(), E>; + +/// Asynchronous handler for event `T` (like `&self, I -> Future` fn) +pub trait Handler<'a, T, E> { + fn handle(&self, value: T) -> Pin> + 'a>>; +} + +pub trait IntoHandlerResult { + fn into_hr(self) -> HandlerResult; +} + +impl IntoHandlerResult for () { + fn into_hr(self) -> HandlerResult { + Ok(()) + } +} + +impl IntoHandlerResult for HandlerResult { + fn into_hr(self) -> HandlerResult { + self + } +} + +impl<'a, F, Fut, R, T, E> Handler<'a, T, E> for F +where + F: Fn(T) -> Fut, + Fut: Future + 'a, + R: IntoHandlerResult + 'a, + E: 'a, +{ + fn handle(&self, value: T) -> Pin> + 'a>> { + Box::pin(self(value).map(IntoHandlerResult::into_hr)) + } +} diff --git a/src/dispatcher/mod.rs b/src/dispatcher/mod.rs new file mode 100644 index 00000000..a4d5c310 --- /dev/null +++ b/src/dispatcher/mod.rs @@ -0,0 +1,9 @@ +//! POC implementation of update dispatching + +pub mod filter; +pub mod handler; +pub mod simple; +pub mod updater; + +pub use filter::Filter; +pub use handler::Handler; diff --git a/src/dispatcher/simple/error_policy.rs b/src/dispatcher/simple/error_policy.rs new file mode 100644 index 00000000..d71b254b --- /dev/null +++ b/src/dispatcher/simple/error_policy.rs @@ -0,0 +1,36 @@ +use std::pin::Pin; +use std::future::Future; +use std::fmt::Debug; + +// TODO: shouldn't it be trait? +pub enum ErrorPolicy<'a, E> { + Ignore, + Log, + Custom(Box Pin + 'a>>>), +} + +impl<'a, E> ErrorPolicy<'a, E> +where + E: Debug, +{ + pub async fn handle_error(&self, error: E) { + match self { + Self::Ignore => {}, + Self::Log => { + // TODO: better message + log::error!("Error in handler: {:?}", error) + } + Self::Custom(func) => { + func(error).await + } + } + } + + pub fn custom(f: F) -> Self + where + F: Fn(E) -> Fut + 'static, + Fut: Future + 'a, + { + Self::Custom(Box::new(move |e| Box::pin(f(e)))) + } +} \ No newline at end of file diff --git a/src/dispatcher/simple/mod.rs b/src/dispatcher/simple/mod.rs new file mode 100644 index 00000000..63ce3a28 --- /dev/null +++ b/src/dispatcher/simple/mod.rs @@ -0,0 +1,304 @@ +pub mod error_policy; + +use crate::{ + dispatcher::{ + filter::Filter, + handler::Handler, + updater::Updater, + }, + types::{ + Update, + Message, + UpdateKind, + CallbackQuery, + ChosenInlineResult, + }, +}; + +use futures::StreamExt; +use crate::dispatcher::simple::error_policy::ErrorPolicy; + + +type Handlers<'a, T, E> = Vec<(Box + 'a>, Box + 'a>)>; + +/// Dispatcher that dispatches updates from telegram. +/// +/// This is 'simple' implementation with following limitations: +/// - Error (`E` generic parameter) _must_ implement [`std::fmt::Debug`] +/// - All 'handlers' are boxed +/// - Handler's fututres are also boxed +/// - [Custom error policy] is also boxed +/// - All errors from [updater] are ignored (TODO: remove this limitation) +/// - All handlers executed in order (this means that in dispatcher have +/// 2 upadtes it will first execute some handler into complition with +/// first update and **then** search for handler for second update, +/// this is probably wrong) +/// +/// ## Examples +/// +/// Simplest example: +/// ```no_run +/// # async fn run() { +/// use std::convert::Infallible; +/// use async_telegram_bot::{ +/// bot::Bot, +/// types::Message, +/// dispatcher::{ +/// updater::polling, +/// simple::{Dispatcher, error_policy::ErrorPolicy}, +/// }, +/// }; +/// +/// async fn handle_edited_message(mes: Message) { +/// println!("Edited message: {:?}", mes) +/// } +/// +/// let bot = Bot::new("TOKEN"); +/// +/// // create dispatcher which handlers can't fail +/// // with error policy that just ignores all errors (that can't ever happen) +/// let mut dp = Dispatcher::::new(ErrorPolicy::Ignore) +/// // Add 'handler' that will handle all messages sent to the bot +/// .message_handler(true, |mes: Message| async move { +/// println!("New message: {:?}", mes) +/// }) +/// // Add 'handler' that will handle all +/// // messages edited in chat with the bot +/// .edited_message_handler(true, handle_edited_message); +/// +/// // Start dispatching updates from long polling +/// dp.dispatch(polling(&bot)).await; +/// # } +/// ``` +/// +/// [`std::fmt::Debug`]: std::fmt::Debug +/// [Custom error policy]: crate::dispatcher::simple::error_policy::ErrorPolicy::Custom +/// [updater]: crate::dispatcher::updater +pub struct Dispatcher<'a, E> { + message_handlers: Handlers<'a, Message, E>, + edited_message_handlers: Handlers<'a, Message, E>, + channel_post_handlers: Handlers<'a, Message, E>, + edited_channel_post_handlers: Handlers<'a, Message, E>, + inline_query_handlers: Handlers<'a, (), E>, + chosen_inline_result_handlers: Handlers<'a, ChosenInlineResult, E>, + callback_query_handlers: Handlers<'a, CallbackQuery, E>, + error_policy: ErrorPolicy<'a, E>, +} + +impl<'a, E> Dispatcher<'a, E> +where + E: std::fmt::Debug, // TODO: Is this really necessary? +{ + pub fn new(error_policy: ErrorPolicy<'a, E>) -> Self { + Dispatcher { + message_handlers: Vec::new(), + edited_message_handlers: Vec::new(), + channel_post_handlers: Vec::new(), + edited_channel_post_handlers: Vec::new(), + inline_query_handlers: Vec::new(), + chosen_inline_result_handlers: Vec::new(), + callback_query_handlers: Vec::new(), + error_policy + } + } + + pub fn message_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter + 'a, + H: Handler<'a, Message, E> + 'a, + { + self.message_handlers.push((Box::new(filter), Box::new(handler))); + self + } + + pub fn edited_message_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter + 'a, + H: Handler<'a, Message, E> + 'a, + { + self.edited_message_handlers.push((Box::new(filter), Box::new(handler))); + self + } + + pub fn channel_post_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter + 'a, + H: Handler<'a, Message, E> + 'a, + { + self.channel_post_handlers.push((Box::new(filter), Box::new(handler))); + self + } + + pub fn edited_channel_post_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter + 'a, + H: Handler<'a, Message, E> + 'a, + { + self.edited_channel_post_handlers.push((Box::new(filter), Box::new(handler))); + self + } + + pub fn inline_query_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter<()> + 'a, + H: Handler<'a, (), E> + 'a, + { + self.inline_query_handlers.push((Box::new(filter), Box::new(handler))); + self + } + + pub fn chosen_inline_result_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter + 'a, + H: Handler<'a, ChosenInlineResult, E> + 'a, + { + self.chosen_inline_result_handlers.push((Box::new(filter), Box::new(handler))); + self + } + + pub fn callback_query_handler(mut self, filter: F, handler: H) -> Self + where + F: Filter + 'a, + H: Handler<'a, CallbackQuery, E> + 'a, + { + self.callback_query_handlers.push((Box::new(filter), Box::new(handler))); + self + } + + // TODO: Can someone simplify this? + pub async fn dispatch(&mut self, updates: U) + where + U: Updater + 'a + { + updates.for_each(|res| { + async { + let res = res; + let Update { kind, id } = match res { + Ok(upd) => upd, + _ => return // TODO: proper error handling + }; + + log::debug!("Handled update#{id:?}: {kind:?}", id = id, kind = kind); + + // TODO: can someone extract this to a function? + macro_rules! call { + ($h:expr, $value:expr) => {{ + let value = $value; + let handler = $h.iter().find_map(|e| { + let (filter, handler) = e; + if filter.test(&value) { + Some(handler) + } else { + None + } + }); + + match handler { + Some(handler) => { + if let Err(err) = handler.handle(value).await { + self.error_policy.handle_error(err).await; + } + }, + None => log::warn!("Unhandled update: {:?}", value) + } + }}; + } + + match kind { + UpdateKind::Message(mes) => call!(self.message_handlers, mes), + UpdateKind::EditedMessage(mes) => call!(self.edited_message_handlers, mes), + UpdateKind::ChannelPost(post) => call!(self.channel_post_handlers, post), + UpdateKind::EditedChannelPost(post) => call!(self.edited_channel_post_handlers, post), + UpdateKind::InlineQuery(query) => call!(self.inline_query_handlers, query), + UpdateKind::ChosenInlineResult(result) => call!(self.chosen_inline_result_handlers, result), + UpdateKind::CallbackQuery(callback) => call!(self.callback_query_handlers, callback), + } + } + }) + .await; + } +} + + +#[cfg(test)] +mod tests { + use std::convert::Infallible; + use std::sync::atomic::{AtomicI32, Ordering}; + + use crate::{ + types::{ + Message, ChatKind, MessageKind, Sender, ForwardKind, MediaKind, Chat, User, Update, UpdateKind + }, + dispatcher::{simple::{Dispatcher, error_policy::ErrorPolicy}, updater::StreamUpdater}, + }; + use futures::Stream; + + #[tokio::test] + async fn first_handler_executes_1_time() { + let counter = &AtomicI32::new(0); + let counter2 = &AtomicI32::new(0); + + let mut dp = Dispatcher::::new(ErrorPolicy::Ignore) + .message_handler(true, |_mes: Message| async move { + counter.fetch_add(1, Ordering::SeqCst); + }) + .message_handler(true, |_mes: Message| async move { + counter2.fetch_add(1, Ordering::SeqCst); + Ok::<_, Infallible>(()) + }); + + dp.dispatch(one_message_updater()).await; + + assert_eq!(counter.load(Ordering::SeqCst), 1); + assert_eq!(counter2.load(Ordering::SeqCst), 0); + } + + fn message() -> Message { + Message { + id: 6534, + date: 1567898953, + chat: Chat { + id: 218485655, + photo: None, + kind: ChatKind::Private { + type_: (), + first_name: Some("W".to_string()), + last_name: None, + username: Some("WaffleLapkin".to_string()), + }, + }, + kind: MessageKind::Common { + from: Sender::User(User { + id: 457569668, + is_bot: true, + first_name: "BT".to_string(), + last_name: None, + username: Some("BloodyTestBot".to_string()), + language_code: None, + }), + forward_kind: ForwardKind::Origin { + reply_to_message: None, + }, + edit_date: None, + media_kind: MediaKind::Text { + text: "text".to_string(), + entities: vec![], + }, + reply_markup: None, + }, + } + } + + fn message_update() -> Update { + Update { id: 0, kind: UpdateKind::Message(message()) } + } + + fn one_message_updater() -> StreamUpdater>> { + use futures::future::ready; + use futures::stream; + + StreamUpdater::new( + stream::once(ready(Ok(message_update()))) + ) + } +} diff --git a/src/dispatcher/updater.rs b/src/dispatcher/updater.rs new file mode 100644 index 00000000..8dd14da1 --- /dev/null +++ b/src/dispatcher/updater.rs @@ -0,0 +1,147 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project::pin_project; +use futures::{Stream, StreamExt, stream}; + +use crate::{ + bot::Bot, + types::Update, + RequestError, +}; + +// Currently just a placeholder, but I'll add here some methods +/// Updater is stream of updates. +/// +/// Telegram supports 2 ways of [getting updates]: [long polling](Long Polling) and webhook +/// +/// ## Long Polling +/// +/// In long polling ([wiki]) you just call [GetUpdates] every N seconds. +/// +/// #### Example: +/// +/// +/// +/// ^1 Timeout can be even 0 +/// (this is also called short polling), +/// but you should use it **only** for testing purposes +/// +/// ^2 Large delays will cause in bot lags, +/// so delay shouldn't exceed second. +/// +/// ^3 Note that if telegram already have updates for +/// you it will answer you **without** waiting for a timeout +/// +/// ^4 `offset = N` means that we've already received +/// updates `0..=N` +/// +/// [GetUpdates]: crate::requests::GetUpdates +/// [getting updates]: https://core.telegram.org/bots/api#getting-updates +/// [wiki]: https://en.wikipedia.org/wiki/Push_technology#Long_polling +pub trait Updater: Stream> {} + +#[pin_project] +pub struct StreamUpdater { + #[pin] + stream: S +} + +impl StreamUpdater { + pub fn new(stream: S) -> Self { + Self { stream } + } +} + +impl Stream for StreamUpdater where S: Stream> { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().stream.poll_next(cx) + } +} + +impl Updater for StreamUpdater where S: Stream> {} + +pub fn polling<'a>(bot: &'a Bot) -> impl Updater + 'a { + let stream = stream::unfold((bot, 0), |(bot, mut offset)| async move { + // this match converts Result, _> -> Vec> + let updates = match bot.get_updates().offset(offset).send().await { + Ok(updates) => { + if let Some(upd) = updates.last() { + offset = upd.id + 1; + } + updates.into_iter().map(|u| Ok(u)).collect::>() + }, + Err(err) => vec![Err(err)] + }; + Some((stream::iter(updates), (bot, offset))) + }) + .flatten(); + + StreamUpdater { stream } +} + +// TODO implement webhook (this actually require webserver and probably we +// should add cargo feature that adds webhook) +//pub fn webhook<'a>(bot: &'a Bot, cfg: WebhookConfig) -> Updater> + 'a> {} diff --git a/src/lib.rs b/src/lib.rs index b33d3bf3..8d244ff6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,5 +11,6 @@ mod errors; mod network; pub mod bot; +pub mod dispatcher; pub mod requests; pub mod types; diff --git a/src/types/update.rs b/src/types/update.rs index f6eaa3eb..14251f3d 100644 --- a/src/types/update.rs +++ b/src/types/update.rs @@ -9,7 +9,6 @@ pub struct Update { } #[derive(Debug, Deserialize, PartialEq, Clone)] -#[serde(tag = "type")] #[serde(rename_all = "snake_case")] pub enum UpdateKind { Message(Message), @@ -22,4 +21,76 @@ pub enum UpdateKind { CallbackQuery(CallbackQuery), } -// TODO: tests for deserialization +#[cfg(test)] +mod test { + use crate::types::{ + Chat, ChatKind, ForwardKind, MediaKind, Message, MessageKind, Sender, + Update, UpdateKind, User, + }; + + // TODO: more tests for deserialization + #[test] + fn message() { + let json = r#"{ + "update_id":892252934, + "message":{ + "message_id":6557, + "from":{ + "id":218485655, + "is_bot": false, + "first_name":"Waffle", + "username":"WaffleLapkin", + "language_code":"en" + }, + "chat":{ + "id":218485655, + "first_name":"Waffle", + "username":"WaffleLapkin", + "type":"private" + }, + "date":1569518342, + "text":"hello there" + } + }"#; + + let expected: Update = Update { + id: 892252934, + kind: UpdateKind::Message(Message { + id: 6557, + date: 1569518342, + chat: Chat { + id: 218485655, + kind: ChatKind::Private { + type_: (), + username: Some(String::from("WaffleLapkin")), + first_name: Some(String::from("Waffle")), + last_name: None, + }, + photo: None, + }, + kind: MessageKind::Common { + from: Sender::User(User { + id: 218485655, + is_bot: false, + first_name: String::from("Waffle"), + last_name: None, + username: Some(String::from("WaffleLapkin")), + language_code: Some(String::from("en")), + }), + forward_kind: ForwardKind::Origin { + reply_to_message: None, + }, + edit_date: None, + media_kind: MediaKind::Text { + text: String::from("hello there"), + entities: vec![], + }, + reply_markup: None, + }, + }), + }; + + let actual = serde_json::from_str::(json).unwrap(); + assert_eq!(expected, actual); + } +}
+///     tg                           bot
+///      |                            |
+///      |<---------------------------| Updates? (GetUpdates call)
+///      ↑                            ↑
+///      |          timeout^1         |
+///      ↓                            ↓
+/// Nope |--------------------------->|
+///      ↑                            ↑
+///      | delay between GetUpdates^2 |
+///      ↓                            ↓
+///      |<---------------------------| Updates?
+///      ↑                            ↑
+///      |          timeout^3         |
+///      ↓                            ↓
+/// Yes  |-------[updates 0, 1]------>|
+///      ↑                            ↑
+///      |           delay            |
+///      ↓                            ↓
+///      |<-------[offset = 1]--------| Updates?^4
+///      ↑                            ↑
+///      |           timeout          |
+///      ↓                            ↓
+/// Yes  |---------[update 2]-------->|
+///      ↑                            ↑
+///      |           delay            |
+///      ↓                            ↓
+///      |<-------[offset = 2]--------| Updates?
+///      ↑                            ↑
+///      |           timeout          |
+///      ↓                            ↓
+/// Nope |--------------------------->|
+///      ↑                            ↑
+///      |           delay            |
+///      ↓                            ↓
+///      |<-------[offset = 2]--------| Updates?
+///      ↑                            ↑
+///      |           timeout          |
+///      ↓                            ↓
+/// Nope |--------------------------->|
+///      ↑                            ↑
+///      |           delay            |
+///      ↓                            ↓
+///      |<-------[offset = 2]--------| Updates?
+///      ↑                            ↑
+///      |           timeout          |
+///      ↓                            ↓
+/// Yes  |-------[updates 2..5]------>|
+///      ↑                            ↑
+///      |           delay            |
+///      ↓                            ↓
+///      |<-------[offset = 5]--------| Updates?
+///      ↑                            ↑
+///      |           timeout          |
+///      ↓                            ↓
+/// Nope |--------------------------->|
+///      |                            |
+///      ~    and so on, and so on    ~
+///