Split update_listeners into multiple modules

This commit is contained in:
Waffle 2021-05-22 00:08:03 +03:00
parent a162478346
commit 881aa3d6b6
3 changed files with 247 additions and 225 deletions

View file

@ -96,33 +96,30 @@
//!
//! [`UpdateListener`]: UpdateListener
//! [`polling_default`]: polling_default
//! [`polling`]: polling
//! [`polling`]: polling()
//! [`Box::get_updates`]: crate::requests::Requester::get_updates
//! [getting updates]: https://core.telegram.org/bots/api#getting-updates
//! [long]: https://en.wikipedia.org/wiki/Push_technology#Long_polling
//! [short]: https://en.wikipedia.org/wiki/Polling_(computer_science)
//! [webhook]: https://en.wikipedia.org/wiki/Webhook
use futures::{
future::{ready, Either},
stream, Stream, StreamExt,
};
use futures::Stream;
use std::{convert::TryInto, time::Duration};
use teloxide_core::{
payloads::GetUpdates,
requests::{HasPayload, Request, Requester},
types::{AllowedUpdate, SemiparsedVec, Update},
};
use std::time::Duration;
use crate::dispatching::stop_token::{AsyncStopFlag, AsyncStopToken, StopToken};
use crate::{dispatching::stop_token::StopToken, types::Update};
mod polling;
mod stateful_listener;
pub use self::polling::{polling, polling_default};
/// An update listener.
///
/// Implementors of this trait allow getting updates from Telegram.
///
/// Currently Telegram has 2 ways of getting updates -- [polling] and
/// [webhooks]. Currently, only the former one is implemented (see [`polling`]
/// [webhooks]. Currently, only the former one is implemented (see [`polling()`]
/// and [`polling_default`])
///
/// Some functions of this trait are located in the supertrait
@ -156,7 +153,7 @@ pub trait UpdateListener<E>: for<'a> AsUpdateStream<'a, E> {
/// Timeout duration hint.
///
/// This hints how often dispatcher should check for shutdown. E.g. for
/// [`polling`] this returns the [`timeout`].
/// [`polling()`] this returns the [`timeout`].
///
/// [`timeout`]: crate::payloads::GetUpdates::timeout
///
@ -179,214 +176,3 @@ pub trait AsUpdateStream<'a, E> {
/// [`Stream`]: AsUpdateStream::Stream
fn as_stream(&'a mut self) -> Self::Stream;
}
/// Returns a long polling update listener with `timeout` of 10 seconds.
///
/// See also: [`polling`](polling).
///
/// ## Notes
///
/// This function will automatically delete a webhook if it was set up.
pub async fn polling_default<R>(requester: R) -> impl UpdateListener<R::Err>
where
R: Requester + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
delete_webhook_if_setup(&requester).await;
polling(requester, Some(Duration::from_secs(10)), None, None)
}
/// Returns a long/short polling update listener with some additional options.
///
/// - `bot`: Using this bot, the returned update listener will receive updates.
/// - `timeout`: A timeout for polling.
/// - `limit`: Limits the number of updates to be retrieved at once. Values
/// between 1—100 are accepted.
/// - `allowed_updates`: A list the types of updates you want to receive.
/// See [`GetUpdates`] for defaults.
///
/// See also: [`polling_default`](polling_default).
///
/// [`GetUpdates`]: crate::payloads::GetUpdates
pub fn polling<R>(
requester: R,
timeout: Option<Duration>,
limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>,
) -> impl UpdateListener<R::Err>
where
R: Requester + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
struct State<B: Requester> {
bot: B,
timeout: Option<u32>,
limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>,
offset: i32,
flag: AsyncStopFlag,
token: AsyncStopToken,
}
fn stream<B>(st: &mut State<B>) -> impl Stream<Item = Result<Update, B::Err>> + '_
where
B: Requester,
{
stream::unfold(st, move |state| async move {
let State { timeout, limit, allowed_updates, bot, offset, flag, .. } = &mut *state;
if flag.is_stopped() {
let mut req = bot.get_updates_fault_tolerant();
req.payload_mut().0 = GetUpdates {
offset: Some(*offset),
timeout: Some(0),
limit: Some(1),
allowed_updates: allowed_updates.take(),
};
return match req.send().await {
Ok(_) => None,
Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)),
};
}
let mut req = bot.get_updates_fault_tolerant();
req.payload_mut().0 = GetUpdates {
offset: Some(*offset),
timeout: *timeout,
limit: *limit,
allowed_updates: allowed_updates.take(),
};
let updates = match req.send().await {
Err(err) => return Some((Either::Left(stream::once(ready(Err(err)))), state)),
Ok(SemiparsedVec(updates)) => {
// Set offset to the last update's id + 1
if let Some(upd) = updates.last() {
let id: i32 = match upd {
Ok(ok) => ok.id,
Err((value, _)) => value["update_id"]
.as_i64()
.expect("The 'update_id' field must always exist in Update")
.try_into()
.expect("update_id must be i32"),
};
*offset = id + 1;
}
for update in &updates {
if let Err((value, e)) = update {
log::error!(
"Cannot parse an update.\nError: {:?}\nValue: {}\n\
This is a bug in teloxide-core, please open an issue here: \
https://github.com/teloxide/teloxide-core/issues.",
e,
value
);
}
}
updates.into_iter().filter_map(Result::ok).map(Ok)
}
};
Some((Either::Right(stream::iter(updates)), state))
})
.flatten()
}
let (token, flag) = AsyncStopToken::new_pair();
let state = State {
bot: requester,
timeout: timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")),
limit,
allowed_updates,
offset: 0,
flag,
token,
};
let stop = |st: &mut State<_>| st.token.clone();
let timeout_hint = Some(move |_: &State<_>| timeout);
StatefulListener { state, stream, stop, timeout_hint }
}
async fn delete_webhook_if_setup<R>(requester: &R)
where
R: Requester,
{
let webhook_info = match requester.get_webhook_info().send().await {
Ok(ok) => ok,
Err(e) => {
log::error!("Failed to get webhook info: {:?}", e);
return;
}
};
let is_webhook_setup = !webhook_info.url.is_empty();
if is_webhook_setup {
if let Err(e) = requester.delete_webhook().send().await {
log::error!("Failed to delete a webhook: {:?}", e);
}
}
}
/// A listener created from `state` and `stream`/`stop` functions.
struct StatefulListener<St, Assf, Sf, Thf> {
/// The state of the listener.
state: St,
/// Function used as `AsUpdateStream::as_stream`.
///
/// Must be of type `for<'a> &'a mut St -> impl Stream + 'a` and callable by
/// `&mut`.
stream: Assf,
/// Function used as `UpdateListener::stop`.
///
/// Must be of type `for<'a> &'a mut St -> impl StopToken`.
stop: Sf,
/// Function used as `UpdateListener::timeout_hint`.
///
/// Must be of type `for<'a> &'a St -> Option<Duration>` and callable by
/// `&`.
timeout_hint: Option<Thf>,
}
impl<'a, St, Assf, Sf, Thf, Strm, E> AsUpdateStream<'a, E> for StatefulListener<St, Assf, Sf, Thf>
where
(St, Strm): 'a,
Assf: FnMut(&'a mut St) -> Strm,
Strm: Stream<Item = Result<Update, E>>,
{
type Stream = Strm;
fn as_stream(&'a mut self) -> Self::Stream {
(self.stream)(&mut self.state)
}
}
impl<St, Assf, Sf, Stt, Thf, E> UpdateListener<E> for StatefulListener<St, Assf, Sf, Thf>
where
Self: for<'a> AsUpdateStream<'a, E>,
Sf: FnMut(&mut St) -> Stt,
Stt: StopToken,
Thf: Fn(&St) -> Option<Duration>,
{
type StopToken = Stt;
fn stop_token(&mut self) -> Stt {
(self.stop)(&mut self.state)
}
fn timeout_hint(&self) -> Option<Duration> {
self.timeout_hint.as_ref().and_then(|f| f(&self.state))
}
}

View file

@ -0,0 +1,173 @@
use std::{convert::TryInto, time::Duration};
use futures::{
future::{ready, Either},
stream::{self, Stream, StreamExt},
};
use crate::{
dispatching::{
stop_token::{AsyncStopFlag, AsyncStopToken},
update_listeners::{stateful_listener::StatefulListener, UpdateListener},
},
payloads::GetUpdates,
requests::{HasPayload, Request, Requester},
types::{AllowedUpdate, SemiparsedVec, Update},
};
/// Returns a long polling update listener with `timeout` of 10 seconds.
///
/// See also: [`polling`](polling).
///
/// ## Notes
///
/// This function will automatically delete a webhook if it was set up.
pub async fn polling_default<R>(requester: R) -> impl UpdateListener<R::Err>
where
R: Requester + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
delete_webhook_if_setup(&requester).await;
polling(requester, Some(Duration::from_secs(10)), None, None)
}
/// Returns a long/short polling update listener with some additional options.
///
/// - `bot`: Using this bot, the returned update listener will receive updates.
/// - `timeout`: A timeout for polling.
/// - `limit`: Limits the number of updates to be retrieved at once. Values
/// between 1—100 are accepted.
/// - `allowed_updates`: A list the types of updates you want to receive.
/// See [`GetUpdates`] for defaults.
///
/// See also: [`polling_default`](polling_default).
///
/// [`GetUpdates`]: crate::payloads::GetUpdates
pub fn polling<R>(
requester: R,
timeout: Option<Duration>,
limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>,
) -> impl UpdateListener<R::Err>
where
R: Requester + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
struct State<B: Requester> {
bot: B,
timeout: Option<u32>,
limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>,
offset: i32,
flag: AsyncStopFlag,
token: AsyncStopToken,
}
fn stream<B>(st: &mut State<B>) -> impl Stream<Item = Result<Update, B::Err>> + '_
where
B: Requester,
{
stream::unfold(st, move |state| async move {
let State { timeout, limit, allowed_updates, bot, offset, flag, .. } = &mut *state;
if flag.is_stopped() {
let mut req = bot.get_updates_fault_tolerant();
req.payload_mut().0 = GetUpdates {
offset: Some(*offset),
timeout: Some(0),
limit: Some(1),
allowed_updates: allowed_updates.take(),
};
return match req.send().await {
Ok(_) => None,
Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)),
};
}
let mut req = bot.get_updates_fault_tolerant();
req.payload_mut().0 = GetUpdates {
offset: Some(*offset),
timeout: *timeout,
limit: *limit,
allowed_updates: allowed_updates.take(),
};
let updates = match req.send().await {
Err(err) => return Some((Either::Left(stream::once(ready(Err(err)))), state)),
Ok(SemiparsedVec(updates)) => {
// Set offset to the last update's id + 1
if let Some(upd) = updates.last() {
let id: i32 = match upd {
Ok(ok) => ok.id,
Err((value, _)) => value["update_id"]
.as_i64()
.expect("The 'update_id' field must always exist in Update")
.try_into()
.expect("update_id must be i32"),
};
*offset = id + 1;
}
for update in &updates {
if let Err((value, e)) = update {
log::error!(
"Cannot parse an update.\nError: {:?}\nValue: {}\n\
This is a bug in teloxide-core, please open an issue here: \
https://github.com/teloxide/teloxide-core/issues.",
e,
value
);
}
}
updates.into_iter().filter_map(Result::ok).map(Ok)
}
};
Some((Either::Right(stream::iter(updates)), state))
})
.flatten()
}
let (token, flag) = AsyncStopToken::new_pair();
let state = State {
bot: requester,
timeout: timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")),
limit,
allowed_updates,
offset: 0,
flag,
token,
};
let stop = |st: &mut State<_>| st.token.clone();
let timeout_hint = Some(move |_: &State<_>| timeout);
StatefulListener { state, stream, stop, timeout_hint }
}
async fn delete_webhook_if_setup<R>(requester: &R)
where
R: Requester,
{
let webhook_info = match requester.get_webhook_info().send().await {
Ok(ok) => ok,
Err(e) => {
log::error!("Failed to get webhook info: {:?}", e);
return;
}
};
let is_webhook_setup = !webhook_info.url.is_empty();
if is_webhook_setup {
if let Err(e) = requester.delete_webhook().send().await {
log::error!("Failed to delete a webhook: {:?}", e);
}
}
}

View file

@ -0,0 +1,63 @@
use std::time::Duration;
use futures::Stream;
use teloxide_core::types::Update;
use crate::dispatching::{
stop_token::StopToken,
update_listeners::{AsUpdateStream, UpdateListener},
};
/// A listener created from `state` and `stream`/`stop` functions.
pub(crate) struct StatefulListener<St, Assf, Sf, Thf> {
/// The state of the listener.
pub(crate) state: St,
/// Function used as `AsUpdateStream::as_stream`.
///
/// Must be of type `for<'a> &'a mut St -> impl Stream + 'a` and callable by
/// `&mut`.
pub(crate) stream: Assf,
/// Function used as `UpdateListener::stop`.
///
/// Must be of type `for<'a> &'a mut St -> impl StopToken`.
pub(crate) stop: Sf,
/// Function used as `UpdateListener::timeout_hint`.
///
/// Must be of type `for<'a> &'a St -> Option<Duration>` and callable by
/// `&`.
pub(crate) timeout_hint: Option<Thf>,
}
impl<'a, St, Assf, Sf, Thf, Strm, E> AsUpdateStream<'a, E> for StatefulListener<St, Assf, Sf, Thf>
where
(St, Strm): 'a,
Assf: FnMut(&'a mut St) -> Strm,
Strm: Stream<Item = Result<Update, E>>,
{
type Stream = Strm;
fn as_stream(&'a mut self) -> Self::Stream {
(self.stream)(&mut self.state)
}
}
impl<St, Assf, Sf, Stt, Thf, E> UpdateListener<E> for StatefulListener<St, Assf, Sf, Thf>
where
Self: for<'a> AsUpdateStream<'a, E>,
Sf: FnMut(&mut St) -> Stt,
Stt: StopToken,
Thf: Fn(&St) -> Option<Duration>,
{
type StopToken = Stt;
fn stop_token(&mut self) -> Stt {
(self.stop)(&mut self.state)
}
fn timeout_hint(&self) -> Option<Duration> {
self.timeout_hint.as_ref().and_then(|f| f(&self.state))
}
}