Merge pull request #662 from teloxide/polling_builder

Add `PollingBuilder` & friends

Former-commit-id: 3d4d506203
This commit is contained in:
Waffle Maybe 2022-07-05 00:20:54 +04:00 committed by GitHub
commit e590736585
3 changed files with 335 additions and 160 deletions

View file

@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Security checks based on `secret_token` param of `set_webhook` to built-in webhooks
- `dispatching::update_listeners::{PollingBuilder, Polling, PollingStream}`
### Fixed
@ -17,6 +18,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Add the `Key: Clone` requirement for `impl Dispatcher` [**BC**].
- `dispatching::update_listeners::{polling_default, polling}` now return a named, `Polling<_>` type
### Deprecated
- `dispatching::update_listeners::polling`
## 0.9.2 - 2022-06-07

View file

@ -42,8 +42,9 @@ use crate::{
mod polling;
mod stateful_listener;
#[allow(deprecated)]
pub use self::{
polling::{polling, polling_default},
polling::{polling, polling_default, Polling, PollingBuilder, PollingStream},
stateful_listener::StatefulListener,
};
@ -125,3 +126,11 @@ pub trait AsUpdateStream<'a, E> {
/// [`Stream`]: AsUpdateStream::Stream
fn as_stream(&'a mut self) -> Self::Stream;
}
#[inline(always)]
pub(crate) fn assert_update_listener<L, E>(listener: L) -> L
where
L: UpdateListener<E>,
{
listener
}

View file

@ -1,89 +1,200 @@
use std::{convert::TryInto, time::Duration};
use futures::{
future::{ready, Either},
stream::{self, Stream, StreamExt},
use std::{
convert::TryInto,
future::Future,
pin::Pin,
task::{
self,
Poll::{self, Ready},
},
time::Duration,
vec,
};
use futures::{ready, stream::Stream};
use crate::{
dispatching::{
stop_token::{AsyncStopFlag, AsyncStopToken},
update_listeners::{stateful_listener::StatefulListener, UpdateListener},
update_listeners::{assert_update_listener, AsUpdateStream, UpdateListener},
},
payloads::{GetUpdates, GetUpdatesSetters as _},
requests::{HasPayload, Request, Requester},
types::{AllowedUpdate, Update},
};
/// Returns a long polling update listener with `timeout` of 10 seconds.
/// Builder for polling update listener.
///
/// See also: [`polling`](polling).
///
/// ## Notes
///
/// This function will automatically delete a webhook if it was set up.
pub async fn polling_default<R>(requester: R) -> impl UpdateListener<R::Err>
/// Can be created by [`Polling::builder`].
#[non_exhaustive]
#[must_use = "`PollingBuilder` is a builder and does nothing unless used"]
pub struct PollingBuilder<R> {
pub bot: R,
pub timeout: Option<Duration>,
pub limit: Option<u8>,
pub allowed_updates: Option<Vec<AllowedUpdate>>,
pub drop_pending_updates: bool,
}
impl<R> PollingBuilder<R>
where
R: Requester + Send + 'static,
<R as Requester>::GetUpdates: Send,
{
delete_webhook_if_setup(&requester).await;
polling(requester, Some(Duration::from_secs(10)), None, None)
/// A timeout in seconds for polling.
///
/// ## Note
///
/// `timeout` should not be bigger than http client timeout, see
/// [`default_reqwest_settings`] for default http client settings.
///
/// [`default_reqwest_settings`]: crate::net::default_reqwest_settings
pub fn timeout(self, timeout: Duration) -> Self {
Self { timeout: Some(timeout), ..self }
}
/// Limit the number of updates to be retrieved at once. Values between
/// 1—100 are accepted.
///
/// ## Panics
///
/// If `limit` is 0 or greater than 100.
#[track_caller]
pub fn limit(self, limit: u8) -> Self {
assert_ne!(limit, 0, "limit can't be 0");
assert!(limit <= 100, "maximum limit is 100, can't set limit to `{limit}`");
Self { limit: Some(limit), ..self }
}
/// A list of the types of updates you want to receive.
///
/// ## Note
///
/// Teloxide normally (when using [`Dispatcher`] or [`repl`]s) sets this
/// automatically via [`hint_allowed_updates`], so you rarely need to use
/// `allowed_updates` explicitly.
///
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`repl`]: fn@crate::repl
/// [`hint_allowed_updates`]: crate::dispatching::update_listeners::UpdateListener::hint_allowed_updates
pub fn allowed_updates(self, allowed_updates: Vec<AllowedUpdate>) -> Self {
Self { allowed_updates: Some(allowed_updates), ..self }
}
/// Drops pending updates.
pub fn drop_pending_updates(self) -> Self {
Self { drop_pending_updates: true, ..self }
}
/// Deletes webhook if it was set up.
pub async fn delete_webhook(self) -> Self {
delete_webhook_if_setup(&self.bot).await;
self
}
/// Returns a long polling update listener with configuration from the
/// builder.
///
/// See also: [`polling_default`], [`Polling`].
pub fn build(self) -> Polling<R> {
let Self { bot, timeout, limit, allowed_updates, drop_pending_updates } = self;
let (token, flag) = AsyncStopToken::new_pair();
let polling =
Polling { bot, timeout, limit, allowed_updates, drop_pending_updates, flag, token };
assert_update_listener(polling)
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]
/// Returns a long polling update listener with some additional options.
/// Returns a long polling update listener with `timeout` of 10 seconds.
///
/// - `bot`: Using this bot, the returned update listener will receive updates.
/// - `timeout`: A timeout in seconds for polling.
/// - `limit`: Limits the number of updates to be retrieved at once. Values
/// between 1—100 are accepted.
/// - `allowed_updates`: A list the types of updates you want to receive.
///
/// See [`GetUpdates`] for defaults.
///
/// See also: [`polling_default`](polling_default).
/// See also: [`Polling::builder`].
///
/// ## Notes
///
/// - `timeout` should not be bigger than http client timeout, see
/// [`default_reqwest_settings`] for default http client settings.
/// - [`repl`]s and [`Dispatcher`] use [`hint_allowed_updates`] to set
/// `allowed_updates`, so you rarely need to pass `allowed_updates`
/// explicitly.
///
/// [`default_reqwest_settings`]: teloxide::net::default_reqwest_settings
/// [`repl`]: fn@crate::repl
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`hint_allowed_updates`]:
/// crate::dispatching::update_listeners::UpdateListener::hint_allowed_updates
/// This function will automatically delete a webhook if it was set up.
pub async fn polling_default<R>(bot: R) -> Polling<R>
where
R: Requester + Send + 'static,
<R as Requester>::GetUpdates: Send,
{
let polling =
Polling::builder(bot).timeout(Duration::from_secs(10)).delete_webhook().await.build();
assert_update_listener(polling)
}
/// Returns a long polling update listener with some additional options.
#[deprecated(since = "0.10.0", note = "use `Polling::builder()` instead")]
pub fn polling<R>(
bot: R,
timeout: Option<Duration>,
limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>,
) -> Polling<R>
where
R: Requester + Send + 'static,
<R as Requester>::GetUpdates: Send,
{
let mut builder = Polling::builder(bot);
builder.timeout = timeout;
builder.limit = limit;
builder.allowed_updates = allowed_updates;
assert_update_listener(builder.build())
}
async fn delete_webhook_if_setup<R>(requester: &R)
where
R: Requester,
{
let webhook_info = match requester.get_webhook_info().send().await {
Ok(ok) => ok,
Err(e) => {
log::error!("Failed to get webhook info: {:?}", e);
return;
}
};
let is_webhook_setup = webhook_info.url.is_some();
if is_webhook_setup {
if let Err(e) = requester.delete_webhook().send().await {
log::error!("Failed to delete a webhook: {:?}", e);
}
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]
/// A polling update listener.
///
/// ## How it works
///
/// Long polling works by repeatedly calling [`Bot::get_updates`][get_updates].
/// If telegram has any updates, it returns them immediately, otherwise it waits
/// until either it has any updates or `timeout` expires.
/// Long polling works by repeatedly calling
/// [`Bot::get_updates`][get_updates]. If telegram has any updates, it
/// returns them immediately, otherwise it waits until either it has any
/// updates or `timeout` expires.
///
/// Each [`get_updates`][get_updates] call includes an `offset` parameter equal
/// to the latest update id + one, that allows to only receive updates that has
/// not been received before.
/// Each [`get_updates`][get_updates] call includes an `offset` parameter
/// equal to the latest update id + one, that allows to only receive
/// updates that has not been received before.
///
/// When telegram receives a [`get_updates`][get_updates] request with `offset =
/// N` it forgets any updates with id < `N`. When `polling` listener is stopped,
/// it sends [`get_updates`][get_updates] with `timeout = 0, limit = 1` and
/// appropriate `offset`, so future bot restarts won't see updates that were
/// already seen.
/// When telegram receives a [`get_updates`][get_updates] request with
/// `offset = N` it forgets any updates with id < `N`. When `polling`
/// listener is stopped, it sends [`get_updates`][get_updates] with
/// `timeout = 0, limit = 1` and appropriate `offset`, so future bot
/// restarts won't see updates that were already seen.
///
/// Consumers of a `polling` update listener then need to repeatedly call
/// Consumers of a [`Polling`] update listener then need to repeatedly call
/// [`futures::StreamExt::next`] to get the updates.
///
/// Here is an example diagram that shows these interactions between consumers
/// like [`Dispatcher`], `polling` update listener and telegram.
/// Here is an example diagram that shows these interactions between
/// consumers like [`Dispatcher`], [`Polling`] update listener and
/// telegram.
///
/// ```mermaid
/// sequenceDiagram
/// participant C as Consumer
/// participant P as polling
/// participant P as Polling
/// participant T as Telegram
///
/// link C: Dispatcher @ ../struct.Dispatcher.html
@ -123,131 +234,180 @@ where
/// ```
///
/// [get_updates]: crate::requests::Requester::get_updates
pub fn polling<R>(
bot: R,
/// [`Dispatcher`]: crate::dispatching::Dispatcher
#[must_use = "`Polling` is an update listener and does nothing unless used"]
pub struct Polling<B: Requester> {
bot: B,
timeout: Option<Duration>,
limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>,
) -> impl UpdateListener<R::Err>
drop_pending_updates: bool,
flag: AsyncStopFlag,
token: AsyncStopToken,
}
impl<R> Polling<R>
where
R: Requester + Send + 'static,
<R as Requester>::GetUpdates: Send,
{
struct State<B: Requester> {
bot: B,
timeout: Option<u32>,
limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>,
offset: i32,
flag: AsyncStopFlag,
token: AsyncStopToken,
force_stop: bool,
/// Returns a builder for polling update listener.
pub fn builder(bot: R) -> PollingBuilder<R> {
PollingBuilder {
bot,
timeout: None,
limit: None,
allowed_updates: None,
drop_pending_updates: false,
}
}
fn stream<B>(st: &mut State<B>) -> impl Stream<Item = Result<Update, B::Err>> + Send + '_
where
B: Requester + Send,
<B as Requester>::GetUpdates: Send,
{
stream::unfold(st, move |state| async move {
let State { timeout, limit, allowed_updates, bot, offset, flag, force_stop, .. } =
&mut *state;
if *force_stop {
return None;
}
if flag.is_stopped() {
let mut req = bot.get_updates().offset(*offset).timeout(0).limit(1);
req.payload_mut().allowed_updates = allowed_updates.take();
return match req.send().await {
Ok(_) => None,
Err(err) => {
// Prevents infinite retries, see https://github.com/teloxide/teloxide/issues/496
*force_stop = true;
Some((Either::Left(stream::once(ready(Err(err)))), state))
}
};
}
let mut req = bot.get_updates();
*req.payload_mut() = GetUpdates {
offset: Some(*offset),
timeout: *timeout,
limit: *limit,
allowed_updates: allowed_updates.take(),
};
match req.send().await {
Ok(updates) => {
// Set offset to the last update's id + 1
if let Some(upd) = updates.last() {
*offset = upd.id + 1;
}
let updates = updates.into_iter().map(Ok);
Some((Either::Right(stream::iter(updates)), state))
}
Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)),
}
})
.flatten()
}
let (token, flag) = AsyncStopToken::new_pair();
let state = State {
bot,
timeout: timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")),
limit,
allowed_updates,
offset: 0,
flag,
token,
force_stop: false,
};
let stop_token = |st: &mut State<_>| st.token.clone();
let hint_allowed_updates =
Some(|state: &mut State<_>, allowed: &mut dyn Iterator<Item = AllowedUpdate>| {
// TODO: we should probably warn if there already were different allowed updates
// before
state.allowed_updates = Some(allowed.collect());
});
let timeout_hint = Some(move |_: &State<_>| timeout);
StatefulListener::new_with_hints(state, stream, stop_token, hint_allowed_updates, timeout_hint)
}
async fn delete_webhook_if_setup<R>(requester: &R)
where
R: Requester,
{
let webhook_info = match requester.get_webhook_info().send().await {
Ok(ok) => ok,
Err(e) => {
log::error!("Failed to get webhook info: {:?}", e);
return;
}
};
#[pin_project::pin_project]
pub struct PollingStream<'a, B: Requester> {
/// Parent structure
polling: &'a mut Polling<B>,
let is_webhook_setup = webhook_info.url.is_some();
/// Whatever to drop pending updates or not.
drop_pending_updates: bool,
if is_webhook_setup {
if let Err(e) = requester.delete_webhook().send().await {
log::error!("Failed to delete a webhook: {:?}", e);
/// Timeout parameter for normal `get_updates()` calls.
timeout: Option<u32>,
/// Allowed updates parameter for the first `get_updates()` call.
allowed_updates: Option<Vec<AllowedUpdate>>,
/// Offset parameter for normal `get_updates()` calls.
offset: i32,
/// If this is set, return `None` from `poll_next` immediately.
force_stop: bool,
/// If true we've sent last `get_updates()` call for graceful shutdown.
stopping: bool,
/// Buffer of updates to be yielded.
buffer: vec::IntoIter<Update>,
/// In-flight `get_updates()` call.
#[pin]
in_flight: Option<<B::GetUpdates as Request>::Send>,
}
impl<B: Requester + Send + 'static> UpdateListener<B::Err> for Polling<B> {
type StopToken = AsyncStopToken;
fn stop_token(&mut self) -> Self::StopToken {
self.token.clone()
}
fn hint_allowed_updates(&mut self, hint: &mut dyn Iterator<Item = AllowedUpdate>) {
// TODO: we should probably warn if there already were different allowed updates
// before
self.allowed_updates = Some(hint.collect());
}
fn timeout_hint(&self) -> Option<Duration> {
self.timeout
}
}
impl<'a, B: Requester + Send + 'a> AsUpdateStream<'a, B::Err> for Polling<B> {
type Stream = PollingStream<'a, B>;
fn as_stream(&'a mut self) -> Self::Stream {
let timeout = self.timeout.map(|t| t.as_secs().try_into().expect("timeout is too big"));
let allowed_updates = self.allowed_updates.clone();
let drop_pending_updates = self.drop_pending_updates;
PollingStream {
polling: self,
drop_pending_updates,
timeout,
allowed_updates,
offset: 0,
force_stop: false,
stopping: false,
buffer: Vec::new().into_iter(),
in_flight: None,
}
}
}
impl<B: Requester> Stream for PollingStream<'_, B> {
type Item = Result<Update, B::Err>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
if *this.force_stop {
return Ready(None);
}
// Poll in-flight future until completion
if let Some(in_flight) = this.in_flight.as_mut().as_pin_mut() {
let res = ready!(in_flight.poll(cx));
this.in_flight.set(None);
match res {
Ok(_) if *this.stopping => return Ready(None),
Err(err) if *this.stopping => {
// Prevents infinite retries, see https://github.com/teloxide/teloxide/issues/496
*this.force_stop = true;
return Ready(Some(Err(err)));
}
Ok(updates) => {
if let Some(upd) = updates.last() {
*this.offset = upd.id + 1;
}
match *this.drop_pending_updates {
false => *this.buffer = updates.into_iter(),
true => *this.drop_pending_updates = false,
}
}
Err(err) => return Ready(Some(Err(err))),
}
}
// If there are any buffered updates, return one
if let Some(upd) = this.buffer.next() {
return Ready(Some(Ok(upd)));
}
*this.stopping = this.polling.flag.is_stopped();
let (offset, limit, timeout) = match (this.stopping, this.drop_pending_updates) {
// Normal `get_updates()` call
(false, false) => (*this.offset, this.polling.limit, *this.timeout),
// Graceful shutdown `get_updates()` call (shutdown takes priority over dropping pending
// updates)
//
// When stopping we set `timeout = 0` and `limit = 1` so that `get_updates()`
// set last seen update (offset) and return immediately
(true, _) => (*this.offset, Some(1), Some(0)),
// Drop pending updates
(_, true) => (-1, Some(1), Some(0)),
};
let req = this
.polling
.bot
.get_updates()
.with_payload_mut(|pay| {
pay.offset = Some(offset);
pay.timeout = timeout;
pay.limit = limit;
pay.allowed_updates = this.allowed_updates.take();
})
.send();
this.in_flight.set(Some(req));
// Recurse to poll `self.in_flight`
self.poll_next(cx)
}
}
#[test]
fn polling_is_send() {
use crate::dispatching::update_listeners::AsUpdateStream;
let bot = crate::Bot::new("TOKEN");
#[allow(deprecated)]
let mut polling = polling(bot, None, None, None);
assert_send(&polling);