diff --git a/src/dispatcher/handler.rs b/src/dispatcher/handler.rs index b36f3e93..c17dcc9d 100644 --- a/src/dispatcher/handler.rs +++ b/src/dispatcher/handler.rs @@ -1,18 +1,17 @@ -use std::pin::Pin; use std::future::Future; - +use std::pin::Pin; /// Asynchronous handler for event `I` (like `&self, I -> Future` fn) -pub trait Handler { - fn handle(&self, value: I) -> Pin + 'static>>; +pub trait Handler<'a, I> { + fn handle(&self, value: I) -> Pin + 'a>>; } -impl Handler for F +impl<'a, Fut, T, F> Handler<'a, T> for F where - Fut: Future + 'static, + Fut: Future + 'a, F: Fn(T) -> Fut, { - fn handle(&self, value: T) -> Pin + 'static>> { + fn handle(&self, value: T) -> Pin + 'a>> { Box::pin((self)(value)) } } diff --git a/src/dispatcher/mod.rs b/src/dispatcher/mod.rs index f4f068f0..6805dd8f 100644 --- a/src/dispatcher/mod.rs +++ b/src/dispatcher/mod.rs @@ -1,8 +1,8 @@ /// POC implementation of update dispatching - pub mod filter; pub mod handler; pub mod simple; +pub mod updater; pub use filter::Filter; pub use handler::Handler; diff --git a/src/dispatcher/simple/mod.rs b/src/dispatcher/simple/mod.rs index f3be25b5..46f54559 100644 --- a/src/dispatcher/simple/mod.rs +++ b/src/dispatcher/simple/mod.rs @@ -1,9 +1,8 @@ -use std::{future::Future, pin::Pin}; - use crate::{ dispatcher::{ filter::Filter, handler::Handler, + updater::Updater, }, types::{ Update, @@ -14,22 +13,22 @@ use crate::{ }, }; -use tokio::stream::Stream; +use futures::StreamExt; -pub type Handlers = Vec<(Box>, Box>)>; +pub type Handlers<'a, T> = Vec<(Box + 'a>, Box + 'a>)>; -pub struct Dispatcher { - message_handlers: Handlers, - edited_message_handlers: Handlers, - channel_post_handlers: Handlers, - edited_channel_post_handlers: Handlers, - inline_query_handlers: Handlers<()>, - chosen_inline_result_handlers: Handlers, - callback_query_handlers: Handlers, +pub struct Dispatcher<'a> { + message_handlers: Handlers<'a, Message>, + edited_message_handlers: Handlers<'a, Message>, + channel_post_handlers: Handlers<'a, Message>, + edited_channel_post_handlers: Handlers<'a, Message>, + inline_query_handlers: Handlers<'a, ()>, + chosen_inline_result_handlers: Handlers<'a, ChosenInlineResult>, + callback_query_handlers: Handlers<'a, CallbackQuery>, } -impl Dispatcher { +impl<'a> Dispatcher<'a> { pub fn new() -> Self { Dispatcher { message_handlers: Vec::new(), @@ -44,8 +43,8 @@ impl Dispatcher { pub fn message_handler(mut self, filter: F, handler: H) -> Self where - F: Filter + 'static, - H: Handler + 'static, + F: Filter + 'a, + H: Handler<'a, Message> + 'a, { self.message_handlers.push((Box::new(filter), Box::new(handler))); self @@ -53,8 +52,8 @@ impl Dispatcher { pub fn edited_message_handler(mut self, filter: F, handler: H) -> Self where - F: Filter + 'static, - H: Handler + 'static, + F: Filter + 'a, + H: Handler<'a, Message> + 'a, { self.edited_message_handlers.push((Box::new(filter), Box::new(handler))); self @@ -62,8 +61,8 @@ impl Dispatcher { pub fn channel_post_handler(mut self, filter: F, handler: H) -> Self where - F: Filter + 'static, - H: Handler + 'static, + F: Filter + 'a, + H: Handler<'a, Message> + 'a, { self.channel_post_handlers.push((Box::new(filter), Box::new(handler))); self @@ -71,8 +70,8 @@ impl Dispatcher { pub fn edited_channel_post_handler(mut self, filter: F, handler: H) -> Self where - F: Filter + 'static, - H: Handler + 'static, + F: Filter + 'a, + H: Handler<'a, Message> + 'a, { self.edited_channel_post_handlers.push((Box::new(filter), Box::new(handler))); self @@ -80,8 +79,8 @@ impl Dispatcher { pub fn inline_query_handler(mut self, filter: F, handler: H) -> Self where - F: Filter<()> + 'static, - H: Handler<()> + 'static, + F: Filter<()> + 'a, + H: Handler<'a, ()> + 'a, { self.inline_query_handlers.push((Box::new(filter), Box::new(handler))); self @@ -89,8 +88,8 @@ impl Dispatcher { pub fn chosen_inline_result_handler(mut self, filter: F, handler: H) -> Self where - F: Filter + 'static, - H: Handler + 'static, + F: Filter + 'a, + H: Handler<'a, ChosenInlineResult> + 'a, { self.chosen_inline_result_handlers.push((Box::new(filter), Box::new(handler))); self @@ -98,114 +97,73 @@ impl Dispatcher { pub fn callback_query_handler(mut self, filter: F, handler: H) -> Self where - F: Filter + 'static, - H: Handler + 'static, + F: Filter + 'a, + H: Handler<'a, CallbackQuery> + 'a, { self.callback_query_handlers.push((Box::new(filter), Box::new(handler))); self } // TODO: Can someone simplify this? - pub async fn dispatch(&mut self, updates: S) + pub async fn dispatch(&mut self, updates: U) where - S: Stream + U: Updater + 'a { - use futures::StreamExt; + updates.for_each(|res| { + async { + let res = res; + let Update { kind, id } = match res { + Ok(upd) => upd, + _ => return // TODO: proper error handling + }; - let dp = &*self; + log::debug!("Handled update#{id:?}: {kind:?}", id = id, kind = kind); - updates.for_each(|Update { id, kind }| async move { - log::debug!("Handled update#{id:?}: {kind:?}", id = id, kind = kind); + // TODO: can someone extract this to a function? + macro_rules! call { + ($h:expr, $value:expr) => {{ + let value = $value; + let handler = $h.iter().find_map(|e| { + let (filter, handler) = e; + if filter.test(&value) { + Some(handler) + } else { + None + } + }); - match kind { - UpdateKind::Message(mes) => { - call_handler( - find_handler(&dp.message_handlers, &mes), - mes - ) - .await; - }, - UpdateKind::EditedMessage(mes) => { - call_handler( - find_handler(&dp.edited_message_handlers, &mes), - mes - ) - .await; - }, - UpdateKind::ChannelPost(post) => { - call_handler( - find_handler(&dp.channel_post_handlers, &post), - post - ) - .await; - }, - UpdateKind::EditedChannelPost(post) => { - call_handler( - find_handler(&dp.edited_channel_post_handlers, &post), - post - ) - .await; - }, - UpdateKind::InlineQuery(query) => { - call_handler( - find_handler(&dp.inline_query_handlers, &query), - query - ) - .await; - }, - UpdateKind::ChosenInlineResult(result) => { - call_handler( - find_handler(&dp.chosen_inline_result_handlers, &result), - result - ) - .await; - }, - UpdateKind::CallbackQuery(callback) => { - call_handler( - find_handler(&dp.callback_query_handlers, &callback), - callback - ) - .await; - }, + match handler { + Some(handler) => handler.handle(value).await, + None => log::warn!("Unhandled update: {:?}", value) + } + }}; + } + + match kind { + UpdateKind::Message(mes) => call!(self.message_handlers, mes), + UpdateKind::EditedMessage(mes) => call!(self.edited_message_handlers, mes), + UpdateKind::ChannelPost(post) => call!(self.channel_post_handlers, post), + UpdateKind::EditedChannelPost(post) => call!(self.edited_channel_post_handlers, post), + UpdateKind::InlineQuery(query) => call!(self.inline_query_handlers, query), + UpdateKind::ChosenInlineResult(result) => call!(self.chosen_inline_result_handlers, result), + UpdateKind::CallbackQuery(callback) => call!(self.callback_query_handlers, callback), + } } }) .await; } } -/// Helper function -fn find_handler<'a, T: std::fmt::Debug>(handlers: &'a Handlers, value: &T) -> Option<&'a Box>> { - let handler = handlers.iter().find_map(|e| { - let (filter, handler) = e; - if filter.test(value) { - Some(handler) - } else { - None - } - }); - - handler -} - -/// Helper function -async fn call_handler(handler: Option<&Box>>, value: T) { - match handler { - Some(handler) => handler.handle(value).await, - None => log::warn!("Unhandled update: {:?}", value) - } -} #[cfg(test)] mod tests { - use super::*; - #[tokio::test] async fn test() { use crate::{ types::{ Message, ChatKind, MessageKind, Sender, ForwardKind, MediaKind, Chat, User, Update, UpdateKind }, - dispatcher::simple::Dispatcher, + dispatcher::{simple::Dispatcher, updater::StreamUpdater}, }; let mes = Message { @@ -249,6 +207,9 @@ mod tests { let mut dp = Dispatcher::new() .message_handler(true, handler); - dp.dispatch(tokio::stream::iter(vec![Update { id: 0, kind: UpdateKind::Message(mes) }])).await; + use futures::future::ready; + use futures::stream; + + dp.dispatch(StreamUpdater::new(stream::once(ready(Result::<_, ()>::Ok(Update { id: 0, kind: UpdateKind::Message(mes) }))))).await; } } diff --git a/src/dispatcher/updater.rs b/src/dispatcher/updater.rs new file mode 100644 index 00000000..6574b738 --- /dev/null +++ b/src/dispatcher/updater.rs @@ -0,0 +1,62 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project::pin_project; +use futures::{Stream, StreamExt, stream}; + +use crate::{ + bot::Bot, + requests::Request, + types::Update, + RequestError, +}; + +// Currently just a placeholder, but I'll add here some methods +pub trait Updater: Stream> {} + +#[pin_project] +pub struct StreamUpdater { + #[pin] + stream: S +} + +impl StreamUpdater { + pub fn new(stream: S) -> Self { + Self { stream } + } +} + +impl Stream for StreamUpdater where S: Stream> { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().stream.poll_next(cx) + } +} + +impl Updater for StreamUpdater where S: Stream> {} + +pub fn polling<'a>(bot: &'a Bot) -> impl Updater + 'a/*StreamUpdater> + 'a>*/ { + let stream = stream::unfold((bot, 0), |(bot, mut offset)| async move { + // this match converts Result, _> -> Vec> + let updates = match bot.get_updates().offset(offset).send().await { + Ok(updates) => { + if let Some(upd) = updates.last() { + offset = upd.id + 1; + } + updates.into_iter().map(|u| Ok(u)).collect::>() + }, + Err(err) => vec![Err(err)] + }; + Some((stream::iter(updates), (bot, offset))) + }) + .flatten(); + + StreamUpdater { stream } +} + +// TODO implement webhook (this actually require webserver and probably we +// should add cargo feature that adds webhook) +//pub fn webhook<'a>(bot: &'a Bot, cfg: WebhookConfig) -> Updater> + 'a> {}