Refactor teloxide::dispatching

This commit is contained in:
Temirkhan Myrzamadi 2019-12-30 04:46:04 +06:00
parent 0c0e1014c1
commit 410c1cc8d8
10 changed files with 455 additions and 457 deletions

View file

@ -76,10 +76,7 @@ impl Bot {
/// # use teloxide::{Bot, requests::payloads::SendAnimation, types::InputFile};
/// # #[tokio::main] async fn main_() {
/// let bot = Bot::new("TOKEN");
/// let payload = SendAnimation::new(
/// 123456,
/// InputFile::Url(String::from("https://example.com"))
/// );
/// let payload = SendAnimation::new(123456, InputFile::Url(String::from("https://example.com")));
/// bot.execute_multipart(&payload).await;
/// # }
/// ```

View file

@ -1,47 +1,25 @@
//! A dispatcher based on filters.
use futures::StreamExt;
use async_trait::async_trait;
use crate::{
dispatching::{
dispatchers::filter::error_policy::ErrorPolicy, filters::Filter,
handler::Handler, updater::Updater, Dispatcher,
},
dispatching::{filters::Filter, ErrorHandler, Handler, Updater},
types::{
CallbackQuery, ChosenInlineResult, InlineQuery, Message, Update,
UpdateKind,
},
};
pub mod error_policy;
type FilterWithHandler<'a, T, E> =
(Box<dyn Filter<T> + 'a>, Box<dyn Handler<T, E> + 'a>);
type FiltersWithHandlers<'a, T, E> = Vec<FilterWithHandler<'a, T, E>>;
struct FilterAndHandler<'a, T, E> {
filter: Box<dyn Filter<T> + 'a>,
handler: Box<dyn Handler<'a, T, E> + 'a>,
}
impl<'a, T, E> FilterAndHandler<'a, T, E> {
fn new<F, H>(filter: F, handler: H) -> Self
where
F: Filter<T> + 'a,
H: Handler<'a, T, E> + 'a,
{
FilterAndHandler {
filter: Box::new(filter),
handler: Box::new(handler),
}
}
}
type FiltersAndHandlers<'a, T, E> = Vec<FilterAndHandler<'a, T, E>>;
/// Dispatcher that dispatches updates from telegram.
/// A dispatcher based on filters.
///
/// This is 'filter' implementation with following limitations:
/// - Error (`E` generic parameter) _must_ implement [`std::fmt::Debug`]
/// - All 'handlers' are boxed
/// - Handler's fututres are also boxed
/// - All errors from [updater] are ignored (TODO: remove this limitation)
/// - All handlers executed in order (this means that in dispatching have 2
/// upadtes it will first execute some handler into complition with first
/// update and **then** search for handler for second update, this is probably
@ -89,24 +67,32 @@ type FiltersAndHandlers<'a, T, E> = Vec<FilterAndHandler<'a, T, E>>;
///
/// [`std::fmt::Debug`]: std::fmt::Debug
/// [updater]: crate::dispatching::updater
pub struct FilterDispatcher<'a, E, Ep> {
message_handlers: FiltersAndHandlers<'a, Message, E>,
edited_message_handlers: FiltersAndHandlers<'a, Message, E>,
channel_post_handlers: FiltersAndHandlers<'a, Message, E>,
edited_channel_post_handlers: FiltersAndHandlers<'a, Message, E>,
inline_query_handlers: FiltersAndHandlers<'a, InlineQuery, E>,
pub struct FilterDispatcher<'a, E, Eh> {
message_handlers: FiltersWithHandlers<'a, Message, E>,
edited_message_handlers: FiltersWithHandlers<'a, Message, E>,
channel_post_handlers: FiltersWithHandlers<'a, Message, E>,
edited_channel_post_handlers: FiltersWithHandlers<'a, Message, E>,
inline_query_handlers: FiltersWithHandlers<'a, InlineQuery, E>,
chosen_inline_result_handlers:
FiltersAndHandlers<'a, ChosenInlineResult, E>,
callback_query_handlers: FiltersAndHandlers<'a, CallbackQuery, E>,
error_policy: Ep,
FiltersWithHandlers<'a, ChosenInlineResult, E>,
callback_query_handlers: FiltersWithHandlers<'a, CallbackQuery, E>,
error_handler: Eh,
}
impl<'a, E, Ep> FilterDispatcher<'a, E, Ep>
where
Ep: ErrorPolicy<E>,
E: std::fmt::Debug, // TODO: Is this really necessary?
{
pub fn new(error_policy: Ep) -> Self {
/// An error produced either from [`Updater`] or [`Handler`].
///
/// [`Updater`]: crate::dispatching::Updater
/// [`Handler`]: crate::dispatching::Handler
pub enum ErrorKind<E1, E2> {
FromUpdater(E1),
FromHandler(E2),
}
impl<'a, E2, Eh> FilterDispatcher<'a, E2, Eh> {
pub fn new<E1>(error_handler: Eh) -> Self
where
Eh: ErrorHandler<ErrorKind<E1, E2>>,
{
FilterDispatcher {
message_handlers: Vec::new(),
edited_message_handlers: Vec::new(),
@ -115,37 +101,37 @@ where
inline_query_handlers: Vec::new(),
chosen_inline_result_handlers: Vec::new(),
callback_query_handlers: Vec::new(),
error_policy,
error_handler,
}
}
pub fn message_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<Message> + 'a,
H: Handler<'a, Message, E> + 'a,
H: Handler<Message, E2> + 'a,
{
self.message_handlers
.push(FilterAndHandler::new(filter, handler));
.push((Box::new(filter), Box::new(handler)));
self
}
pub fn edited_message_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<Message> + 'a,
H: Handler<'a, Message, E> + 'a,
H: Handler<Message, E2> + 'a,
{
self.edited_message_handlers
.push(FilterAndHandler::new(filter, handler));
.push((Box::new(filter), Box::new(handler)));
self
}
pub fn channel_post_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<Message> + 'a,
H: Handler<'a, Message, E> + 'a,
H: Handler<Message, E2> + 'a,
{
self.channel_post_handlers
.push(FilterAndHandler::new(filter, handler));
.push((Box::new(filter), Box::new(handler)));
self
}
@ -156,20 +142,20 @@ where
) -> Self
where
F: Filter<Message> + 'a,
H: Handler<'a, Message, E> + 'a,
H: Handler<Message, E2> + 'a,
{
self.edited_channel_post_handlers
.push(FilterAndHandler::new(filter, handler));
.push((Box::new(filter), Box::new(handler)));
self
}
pub fn inline_query_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<InlineQuery> + 'a,
H: Handler<'a, InlineQuery, E> + 'a,
H: Handler<InlineQuery, E2> + 'a,
{
self.inline_query_handlers
.push(FilterAndHandler::new(filter, handler));
.push((Box::new(filter), Box::new(handler)));
self
}
@ -180,97 +166,121 @@ where
) -> Self
where
F: Filter<ChosenInlineResult> + 'a,
H: Handler<'a, ChosenInlineResult, E> + 'a,
H: Handler<ChosenInlineResult, E2> + 'a,
{
self.chosen_inline_result_handlers
.push(FilterAndHandler::new(filter, handler));
.push((Box::new(filter), Box::new(handler)));
self
}
pub fn callback_query_handler<F, H>(mut self, filter: F, handler: H) -> Self
where
F: Filter<CallbackQuery> + 'a,
H: Handler<'a, CallbackQuery, E> + 'a,
H: Handler<CallbackQuery, E2> + 'a,
{
self.callback_query_handlers
.push(FilterAndHandler::new(filter, handler));
.push((Box::new(filter), Box::new(handler)));
self
}
// TODO: Can someone simplify this?
pub async fn dispatch<U>(&mut self, updates: U)
pub async fn dispatch<E1, U>(&mut self, updater: U)
where
U: Updater + 'a,
U: Updater<E1> + 'a,
Eh: ErrorHandler<ErrorKind<E1, E2>>,
{
updates
.for_each(|res| {
async {
let Update { kind, id } = match res {
Ok(upd) => upd,
_ => return, // TODO: proper error handling
};
updater
.for_each(|res| async {
let Update { kind, id } = match res {
Ok(upd) => upd,
Err(err) => {
self.error_handler
.handle_error(ErrorKind::FromUpdater(err));
return;
}
};
log::debug!(
"Handled update#{id:?}: {kind:?}",
id = id,
kind = kind
);
log::debug!(
"Handled update#{id:?}: {kind:?}",
id = id,
kind = kind
);
match kind {
UpdateKind::Message(mes) => {
self.handle(mes, &self.message_handlers).await
}
UpdateKind::EditedMessage(mes) => {
self.handle(mes, &self.edited_message_handlers)
.await;
}
UpdateKind::ChannelPost(post) => {
self.handle(post, &self.channel_post_handlers)
.await;
}
UpdateKind::EditedChannelPost(post) => {
self.handle(
post,
&self.edited_channel_post_handlers,
)
.await;
}
UpdateKind::InlineQuery(query) => {
self.handle(query, &self.inline_query_handlers)
.await;
}
UpdateKind::ChosenInlineResult(result) => {
self.handle(
result,
&self.chosen_inline_result_handlers,
)
.await;
}
UpdateKind::CallbackQuery(callback) => {
self.handle(
callback,
&self.callback_query_handlers,
)
.await;
}
match kind {
UpdateKind::Message(mes) => {
Self::handle(
mes,
&mut self.message_handlers,
&mut self.error_handler,
)
.await
}
UpdateKind::EditedMessage(mes) => {
Self::handle(
mes,
&mut self.edited_message_handlers,
&mut self.error_handler,
)
.await;
}
UpdateKind::ChannelPost(post) => {
Self::handle(
post,
&mut self.channel_post_handlers,
&mut self.error_handler,
)
.await;
}
UpdateKind::EditedChannelPost(post) => {
Self::handle(
post,
&mut self.edited_channel_post_handlers,
&mut self.error_handler,
)
.await;
}
UpdateKind::InlineQuery(query) => {
Self::handle(
query,
&mut self.inline_query_handlers,
&mut self.error_handler,
)
.await;
}
UpdateKind::ChosenInlineResult(result) => {
Self::handle(
result,
&mut self.chosen_inline_result_handlers,
&mut self.error_handler,
)
.await;
}
UpdateKind::CallbackQuery(callback) => {
Self::handle(
callback,
&mut self.callback_query_handlers,
&mut self.error_handler,
)
.await;
}
}
})
.await;
}
#[allow(clippy::ptr_arg)] // TODO: proper fix
async fn handle<T>(
&self,
async fn handle<T, E1>(
update: T,
handlers: &FiltersAndHandlers<'a, T, E>,
handlers: &mut FiltersWithHandlers<'a, T, E2>,
error_handler: &mut Eh,
) where
T: std::fmt::Debug,
Eh: ErrorHandler<ErrorKind<E1, E2>>,
{
for x in handlers {
if x.filter.test(&update) {
if let Err(err) = x.handler.handle(update).await {
self.error_policy.handle_error(err).await
if x.0.test(&update) {
if let Err(err) = x.1.handle(update).await {
error_handler
.handle_error(ErrorKind::FromHandler(err))
.await
}
return;
@ -281,18 +291,6 @@ where
}
}
#[async_trait(? Send)]
impl<'a, U, E, Ep> Dispatcher<'a, U> for FilterDispatcher<'a, E, Ep>
where
E: std::fmt::Debug,
U: Updater + 'a,
Ep: ErrorPolicy<E>,
{
async fn dispatch(&'a mut self, updater: U) {
FilterDispatcher::dispatch(self, updater).await
}
}
#[cfg(test)]
mod tests {
use std::{

View file

@ -1,119 +0,0 @@
// Infallible used here instead of `!` to be compatible with rust <1.41
use std::{convert::Infallible, future::Future, pin::Pin};
use async_trait::async_trait;
/// Implementors of this trait are treated as error-handlers.
#[async_trait]
pub trait ErrorPolicy<E> {
async fn handle_error(&self, error: E)
where
E: 'async_trait;
}
/// Error policy that silently ignores all errors
///
/// ## Example
/// ```
/// # #[tokio::main]
/// # async fn main_() {
/// use teloxide::dispatching::dispatchers::filter::error_policy::{
/// ErrorPolicy, Ignore,
/// };
///
/// Ignore.handle_error(()).await;
/// Ignore.handle_error(404).await;
/// Ignore.handle_error(String::from("error")).await;
/// # }
/// ```
pub struct Ignore;
#[async_trait]
impl<E> ErrorPolicy<E> for Ignore
where
E: Send,
{
async fn handle_error(&self, _: E)
where
E: 'async_trait,
{
}
}
/// Error policy that silently ignores all errors that can never happen (e.g.:
/// [`!`] or [`Infallible`])
///
/// ## Examples
/// ```
/// # #[tokio::main]
/// # async fn main_() {
/// use std::convert::{TryInto, Infallible};
///
/// use teloxide::dispatching::dispatchers::filter::error_policy::{
/// ErrorPolicy,
/// IgnoreSafe,
/// };
///
/// let result: Result<String, Infallible> = "str".try_into();
/// match result {
/// Ok(string) => println!("{}", string),
/// Err(inf) => IgnoreSafe.handle_error(inf).await,
/// }
///
/// IgnoreSafe.handle_error(return).await; // return type of `return` is `!` (aka never)
/// # }
/// ```
///
/// ```compile_fail
/// use teloxide::dispatching::dispatchers::filter::error_policy::{
/// ErrorPolicy, IgnoreSafe,
/// };
///
/// IgnoreSafe.handle_error(0);
/// ```
///
/// [`!`]: https://doc.rust-lang.org/std/primitive.never.html
/// [`Infallible`]: std::convert::Infallible
pub struct IgnoreSafe;
#[allow(unreachable_code)]
#[async_trait]
impl ErrorPolicy<Infallible> for IgnoreSafe {
async fn handle_error(&self, _: Infallible)
where
Infallible: 'async_trait,
{
}
}
/// Implementation of `ErrorPolicy` for `async fn`s
///
/// ## Example
/// ```
/// # #[tokio::main]
/// # async fn main_() {
/// use teloxide::dispatching::dispatchers::filter::error_policy::ErrorPolicy;
///
/// let closure = |e: i32| async move { eprintln!("Error code{}", e) };
///
/// closure.handle_error(404).await;
/// # }
/// ```
impl<E, F, Fut> ErrorPolicy<E> for F
where
F: Fn(E) -> Fut + Sync,
Fut: Future<Output = ()> + Send,
E: Send,
{
fn handle_error<'s, 'async_trait>(
&'s self,
error: E,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>
where
's: 'async_trait,
Self: 'async_trait,
E: 'async_trait,
{
Box::pin(async move { self(error).await })
}
}

View file

@ -1,3 +1,5 @@
//! Different types of dispatchers.
pub use filter::FilterDispatcher;
pub mod filter;

View file

@ -0,0 +1,120 @@
//! Error handlers.
//!
//! Looks quite strange for now, but with stabilised asynchronous traits it
//! should be prettier.
// Infallible used here instead of `!` to be compatible with rust <1.41.
use std::{convert::Infallible, future::Future, pin::Pin};
/// A handler of an error.
pub trait ErrorHandler<E> {
fn handle_error<'a>(
&'a mut self,
error: E,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
E: 'a;
}
/// A handler that silently ignores all errors.
///
/// ## Example
/// ```
/// # #[tokio::main]
/// # async fn main_() {
/// use teloxide::dispatching::error_handler::{ErrorHandler, Ignore};
///
/// Ignore.handle_error(()).await;
/// Ignore.handle_error(404).await;
/// Ignore.handle_error(String::from("error")).await;
/// # }
/// ```
pub struct Ignore;
impl<E> ErrorHandler<E> for Ignore {
#[must_use]
fn handle_error<'a>(
&'a mut self,
_: E,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
E: 'a,
{
Box::pin(async {})
}
}
/// An error handler that silently ignores all errors that can never happen
/// (e.g.: [`!`] or [`Infallible`]).
///
/// ## Examples
/// ```
/// # #[tokio::main]
/// # async fn main_() {
/// use std::convert::{Infallible, TryInto};
///
/// use teloxide::dispatching::error_handler::{ErrorHandler, IgnoreSafe};
///
/// let result: Result<String, Infallible> = "str".try_into();
/// match result {
/// Ok(string) => println!("{}", string),
/// Err(inf) => IgnoreSafe.handle_error(inf).await,
/// }
///
/// IgnoreSafe.handle_error(return).await; // return type of `return` is `!` (aka never)
/// # }
/// ```
///
/// ```compile_fail
/// use teloxide::dispatching::dispatchers::filter::error_policy::{
/// ErrorPolicy, IgnoreSafe,
/// };
///
/// IgnoreSafe.handle_error(0);
/// ```
///
/// [`!`]: https://doc.rust-lang.org/std/primitive.never.html
/// [`Infallible`]: std::convert::Infallible
pub struct IgnoreSafe;
#[allow(unreachable_code)]
impl ErrorHandler<Infallible> for IgnoreSafe {
fn handle_error<'a>(
&'a mut self,
_: Infallible,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
Infallible: 'a,
{
Box::pin(async {})
}
}
/// The implementation of `ErrorHandler` for `Fn(error) -> Future<Output = ()>`.
///
/// ## Example
/// ```
/// # #[tokio::main]
/// # async fn main_() {
/// use teloxide::dispatching::error_handler::ErrorHandler;
///
/// let mut closure = |e: i32| async move { eprintln!("Error code{}", e) };
///
/// closure.handle_error(404).await;
/// # }
/// ```
impl<E, F, Fut> ErrorHandler<E> for F
where
F: FnMut(E) -> Fut,
Fut: Future<Output = ()>,
{
fn handle_error<'a>(
&'a mut self,
error: E,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
E: 'a,
{
Box::pin(async move { self(error).await })
}
}

View file

@ -1,3 +1,5 @@
//! Filters of messages.
pub use main::*;
pub use command::*;

View file

@ -1,44 +1,33 @@
use std::{future::Future, pin::Pin};
use futures::FutureExt;
pub type HandlerResult<E> = Result<(), E>;
/// Asynchronous handler for event `T` (like `&self, I -> Future` fn)
pub trait Handler<'a, T, E> {
fn handle(
&self,
/// A handler of a successful value.
pub trait Handler<T, E> {
#[must_use]
fn handle<'a>(
&'a mut self,
value: T,
) -> Pin<Box<dyn Future<Output = HandlerResult<E>> + 'a>>;
) -> Pin<Box<dyn Future<Output = Result<(), E>> + 'a>>
where
T: 'a;
}
pub trait IntoHandlerResult<E> {
fn into_hr(self) -> HandlerResult<E>;
}
impl<E> IntoHandlerResult<E> for () {
fn into_hr(self) -> HandlerResult<E> {
Ok(())
}
}
impl<E> IntoHandlerResult<E> for HandlerResult<E> {
fn into_hr(self) -> HandlerResult<E> {
self
}
}
impl<'a, F, Fut, R, T, E> Handler<'a, T, E> for F
/// The implementation of `Handler` for `Fn(U) -> Future<Output = Result<(),
/// E>`.
///
/// Looks quite strange for now, but with stabilised asynchronous traits it
/// should be prettier.
impl<T, E, F, Fut> Handler<T, E> for F
where
F: Fn(T) -> Fut,
Fut: Future<Output = R> + 'a,
R: IntoHandlerResult<E> + 'a,
E: 'a,
F: FnMut(T) -> Fut,
Fut: Future<Output = Result<(), E>>,
{
fn handle(
&self,
fn handle<'a>(
&'a mut self,
value: T,
) -> Pin<Box<dyn Future<Output = HandlerResult<E>> + 'a>> {
Box::pin(self(value).map(IntoHandlerResult::into_hr))
) -> Pin<Box<dyn Future<Output = Fut::Output> + 'a>>
where
T: 'a,
{
Box::pin(async move { self(value).await })
}
}

View file

@ -1,15 +1,12 @@
//! Update dispatching.
use async_trait::async_trait;
pub mod dispatchers;
pub mod error_handler;
pub mod filters;
mod handler;
pub mod updaters;
pub use error_handler::ErrorHandler;
pub use filters::Filter;
pub use handler::Handler;
pub mod dispatchers;
pub mod filters;
pub mod handler;
pub mod updater;
#[async_trait(? Send)]
pub trait Dispatcher<'a, U> {
async fn dispatch(&'a mut self, updater: U);
}
pub use updaters::Updater;

View file

@ -1,159 +0,0 @@
use std::{
pin::Pin,
task::{Context, Poll},
};
use futures::{stream, Stream, StreamExt};
use pin_project::pin_project;
use crate::{bot::Bot, types::Update, RequestError};
// Currently just a placeholder, but I'll add here some methods
/// Updater is stream of updates.
///
/// Telegram supports 2 ways of [getting updates]: [long polling](Long Polling)
/// and webhook
///
/// ## Long Polling
///
/// In long polling ([wiki]) you just call [GetUpdates] every N seconds.
///
/// #### Example:
///
/// <pre>
/// tg bot
/// | |
/// |<---------------------------| Updates? (GetUpdates call)
/// ↑ ↑
/// | timeout<a id="1b" href="#1">^1</a> |
/// ↓ ↓
/// Nope |--------------------------->|
/// ↑ ↑
/// | delay between GetUpdates<a id="2b" href="#2">^2</a> |
/// ↓ ↓
/// |<---------------------------| Updates?
/// ↑ ↑
/// | timeout<a id="3b" href="#3">^3</a> |
/// ↓ ↓
/// Yes |-------[updates 0, 1]------>|
/// ↑ ↑
/// | delay |
/// ↓ ↓
/// |<-------[offset = 1]--------| Updates?<a id="4b" href="#4">^4</a>
/// ↑ ↑
/// | timeout |
/// ↓ ↓
/// Yes |---------[update 2]-------->|
/// ↑ ↑
/// | delay |
/// ↓ ↓
/// |<-------[offset = 2]--------| Updates?
/// ↑ ↑
/// | timeout |
/// ↓ ↓
/// Nope |--------------------------->|
/// ↑ ↑
/// | delay |
/// ↓ ↓
/// |<-------[offset = 2]--------| Updates?
/// ↑ ↑
/// | timeout |
/// ↓ ↓
/// Nope |--------------------------->|
/// ↑ ↑
/// | delay |
/// ↓ ↓
/// |<-------[offset = 2]--------| Updates?
/// ↑ ↑
/// | timeout |
/// ↓ ↓
/// Yes |-------[updates 2..5]------>|
/// ↑ ↑
/// | delay |
/// ↓ ↓
/// |<-------[offset = 5]--------| Updates?
/// ↑ ↑
/// | timeout |
/// ↓ ↓
/// Nope |--------------------------->|
/// | |
/// ~ and so on, and so on ~
/// </pre>
///
/// <a id="1" href="#1b">^1</a> Timeout can be even 0
/// (this is also called short polling),
/// but you should use it **only** for testing purposes
///
/// <a id="2" href="#2b">^2</a> Large delays will cause in bot lags,
/// so delay shouldn't exceed second.
///
/// <a id="3" href="#3b">^3</a> Note that if telegram already have updates for
/// you it will answer you **without** waiting for a timeout
///
/// <a id="4" href="#4b">^4</a> `offset = N` means that we've already received
/// updates `0..=N`
///
/// [GetUpdates]: crate::requests::payloads::GetUpdates
/// [getting updates]: https://core.telegram.org/bots/api#getting-updates
/// [wiki]: https://en.wikipedia.org/wiki/Push_technology#Long_polling
pub trait Updater:
Stream<Item = Result<Update, <Self as Updater>::Error>>
{
type Error;
}
#[pin_project]
pub struct StreamUpdater<S> {
#[pin]
stream: S,
}
impl<S> StreamUpdater<S> {
pub fn new(stream: S) -> Self {
Self { stream }
}
}
impl<S, E> Stream for StreamUpdater<S>
where
S: Stream<Item = Result<Update, E>>,
{
type Item = Result<Update, E>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.project().stream.poll_next(cx)
}
}
impl<S, E> Updater for StreamUpdater<S>
where
S: Stream<Item = Result<Update, E>>,
{
type Error = E;
}
pub fn polling<'a>(bot: &'a Bot) -> impl Updater<Error = RequestError> + 'a {
let stream = stream::unfold((bot, 0), |(bot, mut offset)| async move {
let updates = match bot.get_updates().offset(offset).send().await {
Ok(updates) => {
if let Some(upd) = updates.last() {
offset = upd.id + 1;
}
updates.into_iter().map(Ok).collect::<Vec<_>>()
}
Err(err) => vec![Err(err)],
};
Some((stream::iter(updates), (bot, offset)))
})
.flatten();
StreamUpdater { stream }
}
// TODO implement webhook (this actually require webserver and probably we
// should add cargo feature that adds webhook)
//pub fn webhook<'a>(bot: &'a cfg: WebhookConfig) -> Updater<impl
// Stream<Item=Result<Update, ???>> + 'a> {}

171
src/dispatching/updaters.rs Normal file
View file

@ -0,0 +1,171 @@
//! Receiving updates from Telegram.
//!
//! The key trait here is [`Updater`]. You can get it by these functions:
//!
//! - [`polling_basic`], which returns a default long polling updater.
//! - [`polling_advanced`], which returns a long/short polling updater with
//! your configuration.
//!
//! And then you can pass it directly to a dispatcher.
//!
//! Telegram supports two ways of [getting updates]: [long]/[short] polling and
//! [webhook].
//!
//! # Long Polling
//!
//! In long polling, you just call [`Box::get_updates`] every N seconds.
//!
//! ## Example
//!
//! <pre>
//! tg bot
//! | |
//! |<---------------------------| Updates? (Bot::get_updates call)
//! ↑ ↑
//! | timeout<a id="1b" href="#1">^1</a> |
//! ↓ ↓
//! Nope |--------------------------->|
//! ↑ ↑
//! | delay between Bot::get_updates<a id="2b" href="#2">^2</a> |
//! ↓ ↓
//! |<---------------------------| Updates?
//! ↑ ↑
//! | timeout<a id="3b" href="#3">^3</a> |
//! ↓ ↓
//! Yes |-------[updates 0, 1]------>|
//! ↑ ↑
//! | delay |
//! ↓ ↓
//! |<-------[offset = 1]--------| Updates?<a id="4b" href="#4">^4</a>
//! ↑ ↑
//! | timeout |
//! ↓ ↓
//! Yes |---------[update 2]-------->|
//! ↑ ↑
//! | delay |
//! ↓ ↓
//! |<-------[offset = 2]--------| Updates?
//! ↑ ↑
//! | timeout |
//! ↓ ↓
//! Nope |--------------------------->|
//! ↑ ↑
//! | delay |
//! ↓ ↓
//! |<-------[offset = 2]--------| Updates?
//! ↑ ↑
//! | timeout |
//! ↓ ↓
//! Nope |--------------------------->|
//! ↑ ↑
//! | delay |
//! ↓ ↓
//! |<-------[offset = 2]--------| Updates?
//! ↑ ↑
//! | timeout |
//! ↓ ↓
//! Yes |-------[updates 2..5]------>|
//! ↑ ↑
//! | delay |
//! ↓ ↓
//! |<-------[offset = 5]--------| Updates?
//! ↑ ↑
//! | timeout |
//! ↓ ↓
//! Nope |--------------------------->|
//! | |
//! ~ and so on, and so on ~
//! </pre>
//!
//! <a id="1" href="#1b">^1</a> A timeout can be even 0
//! (this is also called short polling),
//! but you should use it **only** for testing purposes.
//!
//! <a id="2" href="#2b">^2</a> Large delays will cause in bot lags,
//! so delay shouldn't exceed second.
//!
//! <a id="3" href="#3b">^3</a> Note that if Telegram already have updates for
//! you it will answer you **without** waiting for a timeout.
//!
//! <a id="4" href="#4b">^4</a> `offset = N` means that we've already received
//! updates `0..=N`.
//!
//! [`Updater`]: Updater
//! [`polling_basic`]: polling_basic
//! [`polling_advanced`]: polling_advanced
//! [`Dispatcher`]: crate::dispatching::Dispatcher::dispatch
//! [`Box::get_updates`]: crate::Bot::get_updates
//! [getting updates]: https://core.telegram.org/bots/api#getting-updates
//! [long]: https://en.wikipedia.org/wiki/Push_technology#Long_polling
//! [short]: https://en.wikipedia.org/wiki/Polling_(computer_science)
//! [webhook]: https://en.wikipedia.org/wiki/Webhook
use futures::{stream, Stream, StreamExt};
use crate::{
bot::Bot, requests::payloads::AllowedUpdate, types::Update, RequestError,
};
use std::{convert::TryInto, time::Duration};
/// A generic updater.
pub trait Updater<E>: Stream<Item = Result<Update, E>> {
// TODO: add some methods here (.shutdown(), etc).
}
impl<S, E> Updater<E> for S where S: Stream<Item = Result<Update, E>> {}
/// Returns a long polling updater with the default configuration.
///
/// It is the same as calling [`polling_advanced`] with `timeout` of 30 seconds,
/// `limit=100` and receive all kinds of updates.
///
/// [`polling_advanced`]: polling_advanced
pub fn polling_basic(bot: &Bot) -> impl Updater<RequestError> + '_ {
polling_advanced::<&[_]>(bot, Duration::from_secs(30), 100, &[])
}
/// Returns a long/short polling updater with some additional options.
///
/// - `bot`: Using this bot, the returned updater will receive updates.
/// - `timeout`: A timeout for polling.
/// - `limit`: Limits the number of updates to be retrieved at once. Values
/// between 1—100 are accepted.
/// - `allowed_updates`: A list the types of updates you want to receive.
pub fn polling_advanced<'a, A>(
bot: &'a Bot,
timeout: Duration,
limit: u8,
allowed_updates: A,
) -> impl Updater<RequestError> + 'a
where
A: Into<&'a [AllowedUpdate]>,
{
let mut allowed_updates = Some(allowed_updates.into());
stream::unfold((bot, 0), move |(bot, mut offset)| async move {
let updates = bot
.get_updates()
.offset(offset)
.timeout(timeout.as_secs().try_into().expect("timeout is too big"))
.limit(limit)
.allowed_updates(allowed_updates.take().unwrap_or(&[]))
.send()
.await
.map_or_else(
|err| vec![Err(err)],
|updates| {
if let Some(upd) = updates.last() {
offset = upd.id + 1;
}
updates.into_iter().map(Ok).collect::<Vec<_>>()
},
);
Some((stream::iter(updates), (bot, offset)))
})
.flatten()
}
// TODO implement webhook (this actually require webserver and probably we
// should add cargo feature that adds webhook)
//pub fn webhook<'a>(bot: &'a cfg: WebhookConfig) -> Updater<impl
// Stream<Item=Result<Update, ???>> + 'a> {}