Merge pull request #20 from WaffleLapkin/dispatcher

[WIP] Dispatcher impl
This commit is contained in:
Temirkhan Myrzamadi 2019-10-15 05:38:26 +00:00 committed by GitHub
commit 4f17f23b69
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 987 additions and 3 deletions

View file

@ -12,6 +12,8 @@ serde = { version = "1.0.101", features = ["derive"] }
derive_more = "0.15.0"
tokio = "0.2.0-alpha.6"
bytes = "0.4.12"
log = "0.4.8"
pin-project = "0.4.0-alpha.7"
futures-preview = "0.3.0-alpha.19"
async-trait = "0.1.13"
thiserror = "1.0.2"

View file

@ -6,7 +6,7 @@ use crate::{
PromoteChatMember, RestrictChatMember, SendAudio, SendChatAction,
SendContact, SendLocation, SendMediaGroup, SendMessage, SendPhoto,
SendPoll, SendVenue, SendVideoNote, SendVoice, StopMessageLiveLocation,
UnbanChatMember, UnpinChatMessage,
UnbanChatMember, UnpinChatMessage, GetUpdates
},
types::{ChatAction, ChatId, ChatPermissions, InputFile, InputMedia},
};
@ -17,6 +17,10 @@ impl Bot {
GetMe::new(self.ctx())
}
pub fn get_updates(&self) -> GetUpdates {
GetUpdates::new(self.ctx())
}
pub fn send_message<C, T>(&self, chat_id: C, text: T) -> SendMessage
where
C: Into<ChatId>,

372
src/dispatcher/filter.rs Normal file
View file

@ -0,0 +1,372 @@
/// Filter that determines that particular event
/// is suitable for particular handler.
pub trait Filter<T> {
/// Passes (return true) if event is suitable (otherwise return false)
fn test(&self, value: &T) -> bool;
}
/// ```
/// use async_telegram_bot::dispatcher::filter::Filter;
///
/// let closure = |i: &i32| -> bool { *i >= 42 };
/// assert!(closure.test(&42));
/// assert!(closure.test(&100));
///
/// assert_eq!(closure.test(&41), false);
/// assert_eq!(closure.test(&0), false);
/// ```
impl<T, F: Fn(&T) -> bool> Filter<T> for F {
fn test(&self, value: &T) -> bool {
(self)(value)
}
}
/// ```
/// use async_telegram_bot::dispatcher::filter::Filter;
///
/// assert!(true.test(&()));
/// assert_eq!(false.test(&()), false);
/// ```
impl<T> Filter<T> for bool {
fn test(&self, _: &T) -> bool { *self }
}
/// And filter.
///
/// Passes if both underlying filters pass.
///
/// **NOTE**: if one of filters don't pass
/// it is **not** guaranteed that other will be executed.
///
/// ## Examples
/// ```
/// use async_telegram_bot::dispatcher::filter::{And, Filter};
///
/// // Note: bool can be treated as `Filter` that always return self.
/// assert_eq!(And::new(true, false).test(&()), false);
/// assert_eq!(And::new(true, false).test(&()), false);
/// assert!(And::new(true, true).test(&()));
/// assert!(And::new(true, And::new(|_: &()| true, true)).test(&()));
/// ```
#[derive(Debug, Clone, Copy)]
pub struct And<A, B>(A, B);
impl<A, B> And<A, B> {
pub fn new(a: A, b: B) -> Self {
And(a, b)
}
}
impl<T, A, B> Filter<T> for And<A, B>
where
A: Filter<T>,
B: Filter<T>,
{
fn test(&self, value: &T) -> bool {
self.0.test(value) && self.1.test(value)
}
}
/// Alias for [`And::new`]
///
/// ## Examples
/// ```
/// use async_telegram_bot::dispatcher::filter::{and, Filter};
///
/// assert!(and(true, true).test(&()));
/// assert_eq!(and(true, false).test(&()), false);
/// ```
///
/// [`And::new`]: crate::dispatcher::filter::And::new
pub fn and<A, B>(a: A, b: B) -> And<A, B> {
And::new(a, b)
}
/// Or filter.
///
/// Passes if at least one underlying filters passes.
///
/// **NOTE**: if one of filters passes
/// it is **not** guaranteed that other will be executed.
///
/// ## Examples
/// ```
/// use async_telegram_bot::dispatcher::filter::{Or, Filter};
///
/// // Note: bool can be treated as `Filter` that always return self.
/// assert!(Or::new(true, false).test(&()));
/// assert!(Or::new(false, true).test(&()));
/// assert!(Or::new(false, Or::new(|_: &()| true, false)).test(&()));
/// assert_eq!(Or::new(false, false).test(&()), false);
/// ```
#[derive(Debug, Clone, Copy)]
pub struct Or<A, B>(A, B);
impl<A, B> Or<A, B> {
pub fn new(a: A, b: B) -> Self {
Or(a, b)
}
}
impl<T, A, B> Filter<T> for Or<A, B>
where
A: Filter<T>,
B: Filter<T>,
{
fn test(&self, value: &T) -> bool {
self.0.test(value) || self.1.test(value)
}
}
/// Alias for [`Or::new`]
///
/// ## Examples
/// ```
/// use async_telegram_bot::dispatcher::filter::{or, Filter};
///
/// assert!(or(true, false).test(&()));
/// assert_eq!(or(false, false).test(&()), false);
/// ```
///
/// [`Or::new`]: crate::dispatcher::filter::Or::new
pub fn or<A, B>(a: A, b: B) -> Or<A, B> {
Or::new(a, b)
}
/// Not filter.
///
/// Passes if underlying filter don't pass.
///
/// ## Examples
/// ```
/// use async_telegram_bot::dispatcher::filter::{Not, Filter};
///
/// // Note: bool can be treated as `Filter` that always return self.
/// assert!(Not::new(false).test(&()));
/// assert_eq!(Not::new(true).test(&()), false);
/// ```
#[derive(Debug, Clone, Copy)]
pub struct Not<A>(A);
impl<A> Not<A> {
pub fn new(a: A) -> Self {
Not(a)
}
}
impl<T, A> Filter<T> for Not<A>
where
A: Filter<T>,
{
fn test(&self, value: &T) -> bool {
!self.0.test(value)
}
}
/// Alias for [`Not::new`]
///
/// ## Examples
/// ```
/// use async_telegram_bot::dispatcher::filter::{not, Filter};
///
/// assert!(not(false).test(&()));
/// assert_eq!(not(true).test(&()), false);
/// ```
///
/// [`Not::new`]: crate::dispatcher::filter::Not::new
pub fn not<A>(a: A) -> Not<A> {
Not::new(a)
}
/// Return [filter] that passes if and only if all of the given filters passes.
///
/// **NOTE**: if one of filters don't pass
/// it is **not** guaranteed that other will be executed.
///
/// ## Examples
/// ```
/// use async_telegram_bot::{all, dispatcher::filter::Filter};
///
/// assert!(all![true].test(&()));
/// assert!(all![true, true].test(&()));
/// assert!(all![true, true, true].test(&()));
///
/// assert_eq!(all![false].test(&()), false);
/// assert_eq!(all![true, false].test(&()), false);
/// assert_eq!(all![false, true].test(&()), false);
/// assert_eq!(all![false, false].test(&()), false);
/// ```
///
/// [filter]: crate::dispatcher::filter::Filter
#[macro_export]
macro_rules! all {
($one:expr) => { $one };
($head:expr, $($tail:tt)+) => {
$crate::dispatcher::filter::And::new(
$head,
$crate::all!($($tail)+)
)
};
}
/// Return [filter] that passes if any of the given filters passes.
///
/// **NOTE**: if one of filters passes
/// it is **not** guaranteed that other will be executed.
///
/// ## Examples
/// ```
/// use async_telegram_bot::{any, dispatcher::filter::Filter};
///
/// assert!(any![true].test(&()));
/// assert!(any![true, true].test(&()));
/// assert!(any![false, true].test(&()));
/// assert!(any![true, false, true].test(&()));
///
/// assert_eq!(any![false].test(&()), false);
/// assert_eq!(any![false, false].test(&()), false);
/// assert_eq!(any![false, false, false].test(&()), false);
/// ```
///
/// [filter]: crate::dispatcher::filter::Filter
#[macro_export]
macro_rules! any {
($one:expr) => { $one };
($head:expr, $($tail:tt)+) => {
$crate::dispatcher::filter::Or::new(
$head,
$crate::all!($($tail)+)
)
};
}
/// Simple wrapper around `Filter` that adds `|` and `&` operators.
///
/// ## Examples
/// ```
/// use async_telegram_bot::dispatcher::filter::{Filter, f, F, And, Or};
///
/// let flt1 = |i: &i32| -> bool { *i > 17 };
/// let flt2 = |i: &i32| -> bool { *i < 42 };
/// let flt3 = |i: &i32| -> bool { *i % 2 == 0 };
///
/// let and = f(flt1) & flt2;
/// assert!(and.test(&19)); // both filters pass
///
/// assert_eq!(and.test(&50), false); // `flt2` doesn't pass
/// assert_eq!(and.test(&16), false); // `flt1` doesn't pass
///
///
/// let or = f(flt1) | flt3;
/// assert!(or.test(&19)); // `flt1` passes
/// assert!(or.test(&16)); // `flt2` passes
/// assert!(or.test(&20)); // both pass
///
/// assert_eq!(or.test(&17), false); // both don't pass
///
///
/// // Note: only first filter in chain should be wrapped in `f(...)`
/// let complicated: F<Or<And<_, _>, _>>= f(flt1) & flt2 | flt3;
/// assert!(complicated.test(&2)); // `flt3` passes
/// assert!(complicated.test(&21)); // `flt1` and `flt2` pass
///
/// assert_eq!(complicated.test(&15), false); // `flt1` and `flt3` don't pass
/// assert_eq!(complicated.test(&43), false); // `flt2` and `flt3` don't pass
/// ```
pub struct F<A>(A);
/// Constructor fn for [F]
///
/// [F]: crate::dispatcher::filter::F;
pub fn f<A>(a: A) -> F<A> {
F(a)
}
impl<T, A> Filter<T> for F<A>
where
A: Filter<T>
{
fn test(&self, value: &T) -> bool {
self.0.test(value)
}
}
impl<A, B> std::ops::BitAnd<B> for F<A> {
type Output = F<And<A, B>>;
fn bitand(self, other: B) -> Self::Output {
f(and(self.0, other))
}
}
impl<A, B> std::ops::BitOr<B> for F<A> {
type Output = F<Or<A, B>>;
fn bitor(self, other: B) -> Self::Output {
f(or(self.0, other))
}
}
/// Extensions for filters
pub trait FilterExt<T /* workaround for `E0207` compiler error */> {
/// Alias for [`Not::new`]
///
/// ## Examples
/// ```
/// use async_telegram_bot::dispatcher::filter::{Filter, FilterExt};
///
/// let flt = |i: &i32| -> bool { *i > 0 };
/// let flt = flt.not();
/// assert!(flt.test(&-1));
/// assert_eq!(flt.test(&1), false);
/// ```
///
/// [`Not::new`]: crate::dispatcher::filter::Not::new
fn not(self) -> Not<Self> where Self: Sized {
Not::new(self)
}
/// Alias for [`And::new`]
///
/// ## Examples
/// ```
/// use async_telegram_bot::dispatcher::filter::{Filter, FilterExt};
///
/// let flt = |i: &i32| -> bool { *i > 0 };
/// let flt = flt.and(|i: &i32| *i < 42);
///
/// assert!(flt.test(&1));
/// assert_eq!(flt.test(&-1), false);
/// assert_eq!(flt.test(&43), false);
/// ```
///
/// [`Not::new`]: crate::dispatcher::filter::And::new
fn and<B>(self, other: B) -> And<Self, B> where Self: Sized {
And::new(self, other)
}
/// Alias for [`Or::new`]
///
/// ## Examples
/// ```
/// use async_telegram_bot::dispatcher::filter::{Filter, FilterExt};
///
/// let flt = |i: &i32| -> bool { *i < 0 };
/// let flt = flt.or(|i: &i32| *i > 42);
///
/// assert!(flt.test(&-1));
/// assert!(flt.test(&43));
/// assert_eq!(flt.test(&17), false);
/// ```
///
/// [`Not::new`]: crate::dispatcher::filter::Or::new
fn or<B>(self, other: B) -> Or<Self, B> where Self: Sized {
Or::new(self, other)
}
}
// All methods implemented via defaults
impl<T, F> FilterExt<T> for F where F: Filter<T> {}

38
src/dispatcher/handler.rs Normal file
View file

@ -0,0 +1,38 @@
use futures::FutureExt;
use std::future::Future;
use std::pin::Pin;
pub type HandlerResult<E> = Result<(), E>;
/// Asynchronous handler for event `T` (like `&self, I -> Future` fn)
pub trait Handler<'a, T, E> {
fn handle(&self, value: T) -> Pin<Box<dyn Future<Output = HandlerResult<E>> + 'a>>;
}
pub trait IntoHandlerResult<E> {
fn into_hr(self) -> HandlerResult<E>;
}
impl<E> IntoHandlerResult<E> for () {
fn into_hr(self) -> HandlerResult<E> {
Ok(())
}
}
impl<E> IntoHandlerResult<E> for HandlerResult<E> {
fn into_hr(self) -> HandlerResult<E> {
self
}
}
impl<'a, F, Fut, R, T, E> Handler<'a, T, E> for F
where
F: Fn(T) -> Fut,
Fut: Future<Output = R> + 'a,
R: IntoHandlerResult<E> + 'a,
E: 'a,
{
fn handle(&self, value: T) -> Pin<Box<dyn Future<Output = HandlerResult<E>> + 'a>> {
Box::pin(self(value).map(IntoHandlerResult::into_hr))
}
}

9
src/dispatcher/mod.rs Normal file
View file

@ -0,0 +1,9 @@
//! POC implementation of update dispatching
pub mod filter;
pub mod handler;
pub mod simple;
pub mod updater;
pub use filter::Filter;
pub use handler::Handler;

View file

@ -0,0 +1,36 @@
use std::pin::Pin;
use std::future::Future;
use std::fmt::Debug;
// TODO: shouldn't it be trait?
pub enum ErrorPolicy<'a, E> {
Ignore,
Log,
Custom(Box<dyn Fn(E) -> Pin<Box<dyn Future<Output = ()> + 'a>>>),
}
impl<'a, E> ErrorPolicy<'a, E>
where
E: Debug,
{
pub async fn handle_error(&self, error: E) {
match self {
Self::Ignore => {},
Self::Log => {
// TODO: better message
log::error!("Error in handler: {:?}", error)
}
Self::Custom(func) => {
func(error).await
}
}
}
pub fn custom<F, Fut>(f: F) -> Self
where
F: Fn(E) -> Fut + 'static,
Fut: Future<Output = ()> + 'a,
{
Self::Custom(Box::new(move |e| Box::pin(f(e))))
}
}

View file

@ -0,0 +1,304 @@
pub mod error_policy;
use crate::{
dispatcher::{
filter::Filter,
handler::Handler,
updater::Updater,
},
types::{
Update,
Message,
UpdateKind,
CallbackQuery,
ChosenInlineResult,
},
};
use futures::StreamExt;
use crate::dispatcher::simple::error_policy::ErrorPolicy;
type Handlers<'a, T, E> = Vec<(Box<dyn Filter<T> + 'a>, Box<dyn Handler<'a, T, E> + 'a>)>;
/// Dispatcher that dispatches updates from telegram.
///
/// This is 'simple' implementation with following limitations:
/// - Error (`E` generic parameter) _must_ implement [`std::fmt::Debug`]
/// - All 'handlers' are boxed
/// - Handler's fututres are also boxed
/// - [Custom error policy] is also boxed
/// - All errors from [updater] are ignored (TODO: remove this limitation)
/// - All handlers executed in order (this means that in dispatcher have
/// 2 upadtes it will first execute some handler into complition with
/// first update and **then** search for handler for second update,
/// this is probably wrong)
///
/// ## Examples
///
/// Simplest example:
/// ```no_run
/// # async fn run() {
/// use std::convert::Infallible;
/// use async_telegram_bot::{
/// bot::Bot,
/// types::Message,
/// dispatcher::{
/// updater::polling,
/// simple::{Dispatcher, error_policy::ErrorPolicy},
/// },
/// };
///
/// async fn handle_edited_message(mes: Message) {
/// println!("Edited message: {:?}", mes)
/// }
///
/// let bot = Bot::new("TOKEN");
///
/// // create dispatcher which handlers can't fail
/// // with error policy that just ignores all errors (that can't ever happen)
/// let mut dp = Dispatcher::<Infallible>::new(ErrorPolicy::Ignore)
/// // Add 'handler' that will handle all messages sent to the bot
/// .message_handler(true, |mes: Message| async move {
/// println!("New message: {:?}", mes)
/// })
/// // Add 'handler' that will handle all
/// // messages edited in chat with the bot
/// .edited_message_handler(true, handle_edited_message);
///
/// // Start dispatching updates from long polling
/// dp.dispatch(polling(&bot)).await;
/// # }
/// ```
///
/// [`std::fmt::Debug`]: std::fmt::Debug
/// [Custom error policy]: crate::dispatcher::simple::error_policy::ErrorPolicy::Custom
/// [updater]: crate::dispatcher::updater
pub struct Dispatcher<'a, E> {
message_handlers: Handlers<'a, Message, E>,
edited_message_handlers: Handlers<'a, Message, E>,
channel_post_handlers: Handlers<'a, Message, E>,
edited_channel_post_handlers: Handlers<'a, Message, E>,
inline_query_handlers: Handlers<'a, (), E>,
chosen_inline_result_handlers: Handlers<'a, ChosenInlineResult, E>,
callback_query_handlers: Handlers<'a, CallbackQuery, E>,
error_policy: ErrorPolicy<'a, E>,
}
impl<'a, E> Dispatcher<'a, E>
where
E: std::fmt::Debug, // TODO: Is this really necessary?
{
pub fn new(error_policy: ErrorPolicy<'a, E>) -> Self {
Dispatcher {
message_handlers: Vec::new(),
edited_message_handlers: Vec::new(),
channel_post_handlers: Vec::new(),
edited_channel_post_handlers: Vec::new(),
inline_query_handlers: Vec::new(),
chosen_inline_result_handlers: Vec::new(),
callback_query_handlers: Vec::new(),
error_policy
}
}
pub fn message_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<Message> + 'a,
H: Handler<'a, Message, E> + 'a,
{
self.message_handlers.push((Box::new(filter), Box::new(handler)));
self
}
pub fn edited_message_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<Message> + 'a,
H: Handler<'a, Message, E> + 'a,
{
self.edited_message_handlers.push((Box::new(filter), Box::new(handler)));
self
}
pub fn channel_post_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<Message> + 'a,
H: Handler<'a, Message, E> + 'a,
{
self.channel_post_handlers.push((Box::new(filter), Box::new(handler)));
self
}
pub fn edited_channel_post_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<Message> + 'a,
H: Handler<'a, Message, E> + 'a,
{
self.edited_channel_post_handlers.push((Box::new(filter), Box::new(handler)));
self
}
pub fn inline_query_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<()> + 'a,
H: Handler<'a, (), E> + 'a,
{
self.inline_query_handlers.push((Box::new(filter), Box::new(handler)));
self
}
pub fn chosen_inline_result_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<ChosenInlineResult> + 'a,
H: Handler<'a, ChosenInlineResult, E> + 'a,
{
self.chosen_inline_result_handlers.push((Box::new(filter), Box::new(handler)));
self
}
pub fn callback_query_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<CallbackQuery> + 'a,
H: Handler<'a, CallbackQuery, E> + 'a,
{
self.callback_query_handlers.push((Box::new(filter), Box::new(handler)));
self
}
// TODO: Can someone simplify this?
pub async fn dispatch<U, UE>(&mut self, updates: U)
where
U: Updater<UE> + 'a
{
updates.for_each(|res| {
async {
let res = res;
let Update { kind, id } = match res {
Ok(upd) => upd,
_ => return // TODO: proper error handling
};
log::debug!("Handled update#{id:?}: {kind:?}", id = id, kind = kind);
// TODO: can someone extract this to a function?
macro_rules! call {
($h:expr, $value:expr) => {{
let value = $value;
let handler = $h.iter().find_map(|e| {
let (filter, handler) = e;
if filter.test(&value) {
Some(handler)
} else {
None
}
});
match handler {
Some(handler) => {
if let Err(err) = handler.handle(value).await {
self.error_policy.handle_error(err).await;
}
},
None => log::warn!("Unhandled update: {:?}", value)
}
}};
}
match kind {
UpdateKind::Message(mes) => call!(self.message_handlers, mes),
UpdateKind::EditedMessage(mes) => call!(self.edited_message_handlers, mes),
UpdateKind::ChannelPost(post) => call!(self.channel_post_handlers, post),
UpdateKind::EditedChannelPost(post) => call!(self.edited_channel_post_handlers, post),
UpdateKind::InlineQuery(query) => call!(self.inline_query_handlers, query),
UpdateKind::ChosenInlineResult(result) => call!(self.chosen_inline_result_handlers, result),
UpdateKind::CallbackQuery(callback) => call!(self.callback_query_handlers, callback),
}
}
})
.await;
}
}
#[cfg(test)]
mod tests {
use std::convert::Infallible;
use std::sync::atomic::{AtomicI32, Ordering};
use crate::{
types::{
Message, ChatKind, MessageKind, Sender, ForwardKind, MediaKind, Chat, User, Update, UpdateKind
},
dispatcher::{simple::{Dispatcher, error_policy::ErrorPolicy}, updater::StreamUpdater},
};
use futures::Stream;
#[tokio::test]
async fn first_handler_executes_1_time() {
let counter = &AtomicI32::new(0);
let counter2 = &AtomicI32::new(0);
let mut dp = Dispatcher::<Infallible>::new(ErrorPolicy::Ignore)
.message_handler(true, |_mes: Message| async move {
counter.fetch_add(1, Ordering::SeqCst);
})
.message_handler(true, |_mes: Message| async move {
counter2.fetch_add(1, Ordering::SeqCst);
Ok::<_, Infallible>(())
});
dp.dispatch(one_message_updater()).await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
assert_eq!(counter2.load(Ordering::SeqCst), 0);
}
fn message() -> Message {
Message {
id: 6534,
date: 1567898953,
chat: Chat {
id: 218485655,
photo: None,
kind: ChatKind::Private {
type_: (),
first_name: Some("W".to_string()),
last_name: None,
username: Some("WaffleLapkin".to_string()),
},
},
kind: MessageKind::Common {
from: Sender::User(User {
id: 457569668,
is_bot: true,
first_name: "BT".to_string(),
last_name: None,
username: Some("BloodyTestBot".to_string()),
language_code: None,
}),
forward_kind: ForwardKind::Origin {
reply_to_message: None,
},
edit_date: None,
media_kind: MediaKind::Text {
text: "text".to_string(),
entities: vec![],
},
reply_markup: None,
},
}
}
fn message_update() -> Update {
Update { id: 0, kind: UpdateKind::Message(message()) }
}
fn one_message_updater() -> StreamUpdater<impl Stream<Item=Result<Update, Infallible>>> {
use futures::future::ready;
use futures::stream;
StreamUpdater::new(
stream::once(ready(Ok(message_update())))
)
}
}

147
src/dispatcher/updater.rs Normal file
View file

@ -0,0 +1,147 @@
use std::{
pin::Pin,
task::{Context, Poll},
};
use pin_project::pin_project;
use futures::{Stream, StreamExt, stream};
use crate::{
bot::Bot,
types::Update,
RequestError,
};
// Currently just a placeholder, but I'll add here some methods
/// Updater is stream of updates.
///
/// Telegram supports 2 ways of [getting updates]: [long polling](Long Polling) and webhook
///
/// ## Long Polling
///
/// In long polling ([wiki]) you just call [GetUpdates] every N seconds.
///
/// #### Example:
///
/// <pre>
/// tg bot
/// | |
/// |<---------------------------| Updates? (GetUpdates call)
/// ↑ ↑
/// | timeout<a id="1b" href="#1">^1</a> |
/// ↓ ↓
/// Nope |--------------------------->|
/// ↑ ↑
/// | delay between GetUpdates<a id="2b" href="#2">^2</a> |
/// ↓ ↓
/// |<---------------------------| Updates?
/// ↑ ↑
/// | timeout<a id="3b" href="#3">^3</a> |
/// ↓ ↓
/// Yes |-------[updates 0, 1]------>|
/// ↑ ↑
/// | delay |
/// ↓ ↓
/// |<-------[offset = 1]--------| Updates?<a id="4b" href="#4">^4</a>
/// ↑ ↑
/// | timeout |
/// ↓ ↓
/// Yes |---------[update 2]-------->|
/// ↑ ↑
/// | delay |
/// ↓ ↓
/// |<-------[offset = 2]--------| Updates?
/// ↑ ↑
/// | timeout |
/// ↓ ↓
/// Nope |--------------------------->|
/// ↑ ↑
/// | delay |
/// ↓ ↓
/// |<-------[offset = 2]--------| Updates?
/// ↑ ↑
/// | timeout |
/// ↓ ↓
/// Nope |--------------------------->|
/// ↑ ↑
/// | delay |
/// ↓ ↓
/// |<-------[offset = 2]--------| Updates?
/// ↑ ↑
/// | timeout |
/// ↓ ↓
/// Yes |-------[updates 2..5]------>|
/// ↑ ↑
/// | delay |
/// ↓ ↓
/// |<-------[offset = 5]--------| Updates?
/// ↑ ↑
/// | timeout |
/// ↓ ↓
/// Nope |--------------------------->|
/// | |
/// ~ and so on, and so on ~
/// </pre>
///
/// <a id="1" href="#1b">^1</a> Timeout can be even 0
/// (this is also called short polling),
/// but you should use it **only** for testing purposes
///
/// <a id="2" href="#2b">^2</a> Large delays will cause in bot lags,
/// so delay shouldn't exceed second.
///
/// <a id="3" href="#3b">^3</a> Note that if telegram already have updates for
/// you it will answer you **without** waiting for a timeout
///
/// <a id="4" href="#4b">^4</a> `offset = N` means that we've already received
/// updates `0..=N`
///
/// [GetUpdates]: crate::requests::GetUpdates
/// [getting updates]: https://core.telegram.org/bots/api#getting-updates
/// [wiki]: https://en.wikipedia.org/wiki/Push_technology#Long_polling
pub trait Updater<E>: Stream<Item=Result<Update, E>> {}
#[pin_project]
pub struct StreamUpdater<S> {
#[pin]
stream: S
}
impl<S> StreamUpdater<S> {
pub fn new(stream: S) -> Self {
Self { stream }
}
}
impl<S, E> Stream for StreamUpdater<S> where S: Stream<Item=Result<Update, E>> {
type Item = Result<Update, E>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().stream.poll_next(cx)
}
}
impl<S, E> Updater<E> for StreamUpdater<S> where S: Stream<Item=Result<Update, E>> {}
pub fn polling<'a>(bot: &'a Bot) -> impl Updater<RequestError> + 'a {
let stream = stream::unfold((bot, 0), |(bot, mut offset)| async move {
// this match converts Result<Vec<_>, _> -> Vec<Result<_, _>>
let updates = match bot.get_updates().offset(offset).send().await {
Ok(updates) => {
if let Some(upd) = updates.last() {
offset = upd.id + 1;
}
updates.into_iter().map(|u| Ok(u)).collect::<Vec<_>>()
},
Err(err) => vec![Err(err)]
};
Some((stream::iter(updates), (bot, offset)))
})
.flatten();
StreamUpdater { stream }
}
// TODO implement webhook (this actually require webserver and probably we
// should add cargo feature that adds webhook)
//pub fn webhook<'a>(bot: &'a Bot, cfg: WebhookConfig) -> Updater<impl Stream<Item=Result<Update, ???>> + 'a> {}

View file

@ -11,5 +11,6 @@ mod errors;
mod network;
pub mod bot;
pub mod dispatcher;
pub mod requests;
pub mod types;

View file

@ -9,7 +9,6 @@ pub struct Update {
}
#[derive(Debug, Deserialize, PartialEq, Clone)]
#[serde(tag = "type")]
#[serde(rename_all = "snake_case")]
pub enum UpdateKind {
Message(Message),
@ -22,4 +21,76 @@ pub enum UpdateKind {
CallbackQuery(CallbackQuery),
}
// TODO: tests for deserialization
#[cfg(test)]
mod test {
use crate::types::{
Chat, ChatKind, ForwardKind, MediaKind, Message, MessageKind, Sender,
Update, UpdateKind, User,
};
// TODO: more tests for deserialization
#[test]
fn message() {
let json = r#"{
"update_id":892252934,
"message":{
"message_id":6557,
"from":{
"id":218485655,
"is_bot": false,
"first_name":"Waffle",
"username":"WaffleLapkin",
"language_code":"en"
},
"chat":{
"id":218485655,
"first_name":"Waffle",
"username":"WaffleLapkin",
"type":"private"
},
"date":1569518342,
"text":"hello there"
}
}"#;
let expected: Update = Update {
id: 892252934,
kind: UpdateKind::Message(Message {
id: 6557,
date: 1569518342,
chat: Chat {
id: 218485655,
kind: ChatKind::Private {
type_: (),
username: Some(String::from("WaffleLapkin")),
first_name: Some(String::from("Waffle")),
last_name: None,
},
photo: None,
},
kind: MessageKind::Common {
from: Sender::User(User {
id: 218485655,
is_bot: false,
first_name: String::from("Waffle"),
last_name: None,
username: Some(String::from("WaffleLapkin")),
language_code: Some(String::from("en")),
}),
forward_kind: ForwardKind::Origin {
reply_to_message: None,
},
edit_date: None,
media_kind: MediaKind::Text {
text: String::from("hello there"),
entities: vec![],
},
reply_markup: None,
},
}),
};
let actual = serde_json::from_str::<Update>(json).unwrap();
assert_eq!(expected, actual);
}
}