Implement polling...

This commit is contained in:
Waffle 2019-09-27 15:51:22 +03:00
parent eb090ab2af
commit f4a3c4f6bd
4 changed files with 138 additions and 116 deletions

View file

@ -1,18 +1,17 @@
use std::pin::Pin;
use std::future::Future; use std::future::Future;
use std::pin::Pin;
/// Asynchronous handler for event `I` (like `&self, I -> Future` fn) /// Asynchronous handler for event `I` (like `&self, I -> Future` fn)
pub trait Handler<I> { pub trait Handler<'a, I> {
fn handle(&self, value: I) -> Pin<Box<dyn Future<Output=()> + 'static>>; fn handle(&self, value: I) -> Pin<Box<dyn Future<Output = ()> + 'a>>;
} }
impl<Fut, T, F> Handler<T> for F impl<'a, Fut, T, F> Handler<'a, T> for F
where where
Fut: Future<Output = ()> + 'static, Fut: Future<Output = ()> + 'a,
F: Fn(T) -> Fut, F: Fn(T) -> Fut,
{ {
fn handle(&self, value: T) -> Pin<Box<dyn Future<Output=()> + 'static>> { fn handle(&self, value: T) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
Box::pin((self)(value)) Box::pin((self)(value))
} }
} }

View file

@ -1,8 +1,8 @@
/// POC implementation of update dispatching /// POC implementation of update dispatching
pub mod filter; pub mod filter;
pub mod handler; pub mod handler;
pub mod simple; pub mod simple;
pub mod updater;
pub use filter::Filter; pub use filter::Filter;
pub use handler::Handler; pub use handler::Handler;

View file

@ -1,9 +1,8 @@
use std::{future::Future, pin::Pin};
use crate::{ use crate::{
dispatcher::{ dispatcher::{
filter::Filter, filter::Filter,
handler::Handler, handler::Handler,
updater::Updater,
}, },
types::{ types::{
Update, Update,
@ -14,22 +13,22 @@ use crate::{
}, },
}; };
use tokio::stream::Stream; use futures::StreamExt;
pub type Handlers<T> = Vec<(Box<dyn Filter<T>>, Box<dyn Handler<T>>)>; pub type Handlers<'a, T> = Vec<(Box<dyn Filter<T> + 'a>, Box<dyn Handler<'a, T> + 'a>)>;
pub struct Dispatcher { pub struct Dispatcher<'a> {
message_handlers: Handlers<Message>, message_handlers: Handlers<'a, Message>,
edited_message_handlers: Handlers<Message>, edited_message_handlers: Handlers<'a, Message>,
channel_post_handlers: Handlers<Message>, channel_post_handlers: Handlers<'a, Message>,
edited_channel_post_handlers: Handlers<Message>, edited_channel_post_handlers: Handlers<'a, Message>,
inline_query_handlers: Handlers<()>, inline_query_handlers: Handlers<'a, ()>,
chosen_inline_result_handlers: Handlers<ChosenInlineResult>, chosen_inline_result_handlers: Handlers<'a, ChosenInlineResult>,
callback_query_handlers: Handlers<CallbackQuery>, callback_query_handlers: Handlers<'a, CallbackQuery>,
} }
impl Dispatcher { impl<'a> Dispatcher<'a> {
pub fn new() -> Self { pub fn new() -> Self {
Dispatcher { Dispatcher {
message_handlers: Vec::new(), message_handlers: Vec::new(),
@ -44,8 +43,8 @@ impl Dispatcher {
pub fn message_handler<F, H>(mut self, filter: F, handler: H) -> Self pub fn message_handler<F, H>(mut self, filter: F, handler: H) -> Self
where where
F: Filter<Message> + 'static, F: Filter<Message> + 'a,
H: Handler<Message> + 'static, H: Handler<'a, Message> + 'a,
{ {
self.message_handlers.push((Box::new(filter), Box::new(handler))); self.message_handlers.push((Box::new(filter), Box::new(handler)));
self self
@ -53,8 +52,8 @@ impl Dispatcher {
pub fn edited_message_handler<F, H>(mut self, filter: F, handler: H) -> Self pub fn edited_message_handler<F, H>(mut self, filter: F, handler: H) -> Self
where where
F: Filter<Message> + 'static, F: Filter<Message> + 'a,
H: Handler<Message> + 'static, H: Handler<'a, Message> + 'a,
{ {
self.edited_message_handlers.push((Box::new(filter), Box::new(handler))); self.edited_message_handlers.push((Box::new(filter), Box::new(handler)));
self self
@ -62,8 +61,8 @@ impl Dispatcher {
pub fn channel_post_handler<F, H>(mut self, filter: F, handler: H) -> Self pub fn channel_post_handler<F, H>(mut self, filter: F, handler: H) -> Self
where where
F: Filter<Message> + 'static, F: Filter<Message> + 'a,
H: Handler<Message> + 'static, H: Handler<'a, Message> + 'a,
{ {
self.channel_post_handlers.push((Box::new(filter), Box::new(handler))); self.channel_post_handlers.push((Box::new(filter), Box::new(handler)));
self self
@ -71,8 +70,8 @@ impl Dispatcher {
pub fn edited_channel_post_handler<F, H>(mut self, filter: F, handler: H) -> Self pub fn edited_channel_post_handler<F, H>(mut self, filter: F, handler: H) -> Self
where where
F: Filter<Message> + 'static, F: Filter<Message> + 'a,
H: Handler<Message> + 'static, H: Handler<'a, Message> + 'a,
{ {
self.edited_channel_post_handlers.push((Box::new(filter), Box::new(handler))); self.edited_channel_post_handlers.push((Box::new(filter), Box::new(handler)));
self self
@ -80,8 +79,8 @@ impl Dispatcher {
pub fn inline_query_handler<F, H>(mut self, filter: F, handler: H) -> Self pub fn inline_query_handler<F, H>(mut self, filter: F, handler: H) -> Self
where where
F: Filter<()> + 'static, F: Filter<()> + 'a,
H: Handler<()> + 'static, H: Handler<'a, ()> + 'a,
{ {
self.inline_query_handlers.push((Box::new(filter), Box::new(handler))); self.inline_query_handlers.push((Box::new(filter), Box::new(handler)));
self self
@ -89,8 +88,8 @@ impl Dispatcher {
pub fn chosen_inline_result_handler<F, H>(mut self, filter: F, handler: H) -> Self pub fn chosen_inline_result_handler<F, H>(mut self, filter: F, handler: H) -> Self
where where
F: Filter<ChosenInlineResult> + 'static, F: Filter<ChosenInlineResult> + 'a,
H: Handler<ChosenInlineResult> + 'static, H: Handler<'a, ChosenInlineResult> + 'a,
{ {
self.chosen_inline_result_handlers.push((Box::new(filter), Box::new(handler))); self.chosen_inline_result_handlers.push((Box::new(filter), Box::new(handler)));
self self
@ -98,114 +97,73 @@ impl Dispatcher {
pub fn callback_query_handler<F, H>(mut self, filter: F, handler: H) -> Self pub fn callback_query_handler<F, H>(mut self, filter: F, handler: H) -> Self
where where
F: Filter<CallbackQuery> + 'static, F: Filter<CallbackQuery> + 'a,
H: Handler<CallbackQuery> + 'static, H: Handler<'a, CallbackQuery> + 'a,
{ {
self.callback_query_handlers.push((Box::new(filter), Box::new(handler))); self.callback_query_handlers.push((Box::new(filter), Box::new(handler)));
self self
} }
// TODO: Can someone simplify this? // TODO: Can someone simplify this?
pub async fn dispatch<S>(&mut self, updates: S) pub async fn dispatch<U, E>(&mut self, updates: U)
where where
S: Stream<Item=Update> U: Updater<E> + 'a
{ {
use futures::StreamExt; updates.for_each(|res| {
async {
let res = res;
let Update { kind, id } = match res {
Ok(upd) => upd,
_ => return // TODO: proper error handling
};
let dp = &*self; log::debug!("Handled update#{id:?}: {kind:?}", id = id, kind = kind);
updates.for_each(|Update { id, kind }| async move { // TODO: can someone extract this to a function?
log::debug!("Handled update#{id:?}: {kind:?}", id = id, kind = kind); macro_rules! call {
($h:expr, $value:expr) => {{
let value = $value;
let handler = $h.iter().find_map(|e| {
let (filter, handler) = e;
if filter.test(&value) {
Some(handler)
} else {
None
}
});
match kind { match handler {
UpdateKind::Message(mes) => { Some(handler) => handler.handle(value).await,
call_handler( None => log::warn!("Unhandled update: {:?}", value)
find_handler(&dp.message_handlers, &mes), }
mes }};
) }
.await;
}, match kind {
UpdateKind::EditedMessage(mes) => { UpdateKind::Message(mes) => call!(self.message_handlers, mes),
call_handler( UpdateKind::EditedMessage(mes) => call!(self.edited_message_handlers, mes),
find_handler(&dp.edited_message_handlers, &mes), UpdateKind::ChannelPost(post) => call!(self.channel_post_handlers, post),
mes UpdateKind::EditedChannelPost(post) => call!(self.edited_channel_post_handlers, post),
) UpdateKind::InlineQuery(query) => call!(self.inline_query_handlers, query),
.await; UpdateKind::ChosenInlineResult(result) => call!(self.chosen_inline_result_handlers, result),
}, UpdateKind::CallbackQuery(callback) => call!(self.callback_query_handlers, callback),
UpdateKind::ChannelPost(post) => { }
call_handler(
find_handler(&dp.channel_post_handlers, &post),
post
)
.await;
},
UpdateKind::EditedChannelPost(post) => {
call_handler(
find_handler(&dp.edited_channel_post_handlers, &post),
post
)
.await;
},
UpdateKind::InlineQuery(query) => {
call_handler(
find_handler(&dp.inline_query_handlers, &query),
query
)
.await;
},
UpdateKind::ChosenInlineResult(result) => {
call_handler(
find_handler(&dp.chosen_inline_result_handlers, &result),
result
)
.await;
},
UpdateKind::CallbackQuery(callback) => {
call_handler(
find_handler(&dp.callback_query_handlers, &callback),
callback
)
.await;
},
} }
}) })
.await; .await;
} }
} }
/// Helper function
fn find_handler<'a, T: std::fmt::Debug>(handlers: &'a Handlers<T>, value: &T) -> Option<&'a Box<Handler<T>>> {
let handler = handlers.iter().find_map(|e| {
let (filter, handler) = e;
if filter.test(value) {
Some(handler)
} else {
None
}
});
handler
}
/// Helper function
async fn call_handler<T: std::fmt::Debug>(handler: Option<&Box<Handler<T>>>, value: T) {
match handler {
Some(handler) => handler.handle(value).await,
None => log::warn!("Unhandled update: {:?}", value)
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*;
#[tokio::test] #[tokio::test]
async fn test() { async fn test() {
use crate::{ use crate::{
types::{ types::{
Message, ChatKind, MessageKind, Sender, ForwardKind, MediaKind, Chat, User, Update, UpdateKind Message, ChatKind, MessageKind, Sender, ForwardKind, MediaKind, Chat, User, Update, UpdateKind
}, },
dispatcher::simple::Dispatcher, dispatcher::{simple::Dispatcher, updater::StreamUpdater},
}; };
let mes = Message { let mes = Message {
@ -249,6 +207,9 @@ mod tests {
let mut dp = Dispatcher::new() let mut dp = Dispatcher::new()
.message_handler(true, handler); .message_handler(true, handler);
dp.dispatch(tokio::stream::iter(vec![Update { id: 0, kind: UpdateKind::Message(mes) }])).await; use futures::future::ready;
use futures::stream;
dp.dispatch(StreamUpdater::new(stream::once(ready(Result::<_, ()>::Ok(Update { id: 0, kind: UpdateKind::Message(mes) }))))).await;
} }
} }

62
src/dispatcher/updater.rs Normal file
View file

@ -0,0 +1,62 @@
use std::{
pin::Pin,
task::{Context, Poll},
};
use pin_project::pin_project;
use futures::{Stream, StreamExt, stream};
use crate::{
bot::Bot,
requests::Request,
types::Update,
RequestError,
};
// Currently just a placeholder, but I'll add here some methods
pub trait Updater<E>: Stream<Item=Result<Update, E>> {}
#[pin_project]
pub struct StreamUpdater<S> {
#[pin]
stream: S
}
impl<S> StreamUpdater<S> {
pub fn new(stream: S) -> Self {
Self { stream }
}
}
impl<S, E> Stream for StreamUpdater<S> where S: Stream<Item=Result<Update, E>> {
type Item = Result<Update, E>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().stream.poll_next(cx)
}
}
impl<S, E> Updater<E> for StreamUpdater<S> where S: Stream<Item=Result<Update, E>> {}
pub fn polling<'a>(bot: &'a Bot) -> impl Updater<RequestError> + 'a/*StreamUpdater<impl Stream<Item=ResponseResult<Update>> + 'a>*/ {
let stream = stream::unfold((bot, 0), |(bot, mut offset)| async move {
// this match converts Result<Vec<_>, _> -> Vec<Result<_, _>>
let updates = match bot.get_updates().offset(offset).send().await {
Ok(updates) => {
if let Some(upd) = updates.last() {
offset = upd.id + 1;
}
updates.into_iter().map(|u| Ok(u)).collect::<Vec<_>>()
},
Err(err) => vec![Err(err)]
};
Some((stream::iter(updates), (bot, offset)))
})
.flatten();
StreamUpdater { stream }
}
// TODO implement webhook (this actually require webserver and probably we
// should add cargo feature that adds webhook)
//pub fn webhook<'a>(bot: &'a Bot, cfg: WebhookConfig) -> Updater<impl Stream<Item=Result<Update, ???>> + 'a> {}