Refactor UpdateListner trait

Instead of `Stream` super trait we now require `for<'a> AsUpdateStream<'a, E'>`
as a super trait. `AsUpdateStream` in turn provides `as_stream` function which
takes `&mut self` and returns `Self::Stream` (the trait is needed to workaround
lack of GAT).

This patch also adds a `stop` function that allows stopping the listener.

Since `UpdateListner` now isn't `Stream` and required methods, it's blanked
implementation for streams was removed.

`polling` and `polling_default` functions now also require `R: 'static`.
This commit is contained in:
Waffle 2021-05-18 14:46:45 +03:00
parent 59e5d385e7
commit aeca45a579
3 changed files with 171 additions and 21 deletions

View file

@ -18,6 +18,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Return an error from `Storage::remove_dialogue` if a dialogue does not exist.
- Require `D: Clone` in `dialogues_repl(_with_listener)` and `InMemStorage`.
- Automatically delete a webhook if it was set up in `update_listeners::polling_default` (thereby making it `async`, [issue 319](https://github.com/teloxide/teloxide/issues/319)).
- `polling` and `polling_default` now require `R: 'static`
- Refactor `UpdateListener` trait:
- Add a `stop` function that allows stopping the listener.
- Remove blanked implementation.
- Remove `Stream` from super traits.
- Add `AsUpdateStream` to super traits.
- Add an `AsUpdateStream` trait that allows turning implementors into streams of updates (GAT workaround).
### Fixed

View file

@ -243,7 +243,7 @@ where
/// `update_listener_error_handler`.
pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>(
&'a self,
update_listener: UListener,
mut update_listener: UListener,
update_listener_error_handler: Arc<Eh>,
) where
UListener: UpdateListener<ListenerE> + 'a,
@ -251,9 +251,8 @@ where
ListenerE: Debug,
R: Requester + Clone,
{
let update_listener = Box::pin(update_listener);
update_listener
.as_stream()
.for_each(move |update| {
let update_listener_error_handler = Arc::clone(&update_listener_error_handler);

View file

@ -111,11 +111,50 @@ use teloxide_core::{
types::{AllowedUpdate, SemiparsedVec, Update},
};
/// A generic update listener.
pub trait UpdateListener<E>: Stream<Item = Result<Update, E>> {
// TODO: add some methods here (.shutdown(), etc).
/// An update listener.
///
/// Implementors of this trait allow getting updates from Telegram.
///
/// Currently Telegram has 2 ways of getting updates -- [polling] and
/// [webhooks]. Currently, only the former one is implemented (see [`polling`]
/// and [`polling_default`])
///
/// Some functions of this trait are located in the supertrait
/// ([`AsUpdateStream`]), see also:
/// - [`Stream`]
/// - [`as_stream`]
///
/// [polling]: self#long-polling
/// [webhooks]: self#webhooks
/// [`Stream`]: AsUpdateStream::Stream
/// [`as_stream`]: AsUpdateStream::as_stream
pub trait UpdateListener<E>: for<'a> AsUpdateStream<'a, E> {
/// Stop listening for updates.
///
/// This function is not guaranteed to have an immidiate effect. That is
/// some listners can return updates even after [`stop`] is called (e.g.:
/// because of buffering).
///
/// [`stop`]: UpdateListener::stop
///
/// Implementors of this function are encouraged to stop listening for
/// updates as soon as possible and return `None` from the update stream as
/// soon as all cached updates are returned.
fn stop(&mut self);
}
/// [`UpdateListener`]'s supertrait/extension.
///
/// This trait is a workaround to not require GAT.
pub trait AsUpdateStream<'a, E> {
/// Stream of updates from Telegram.
type Stream: Stream<Item = Result<Update, E>> + 'a;
/// Creates the update [`Stream`].
///
/// [`Stream`]: AsUpdateStream::Stream
fn as_stream(&'a mut self) -> Self::Stream;
}
impl<S, E> UpdateListener<E> for S where S: Stream<Item = Result<Update, E>> {}
/// Returns a long polling update listener with `timeout` of 10 seconds.
///
@ -126,7 +165,7 @@ impl<S, E> UpdateListener<E> for S where S: Stream<Item = Result<Update, E>> {}
/// This function will automatically delete a webhook if it was set up.
pub async fn polling_default<R>(requester: R) -> impl UpdateListener<R::Err>
where
R: Requester,
R: Requester + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
delete_webhook_if_setup(&requester).await;
@ -152,19 +191,58 @@ pub fn polling<R>(
allowed_updates: Option<Vec<AllowedUpdate>>,
) -> impl UpdateListener<R::Err>
where
R: Requester,
R: Requester + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
let timeout = timeout.map(|t| t.as_secs().try_into().expect("timeout is too big"));
enum RunningState {
Polling,
Stopping,
Stopped,
}
struct State<B> {
bot: B,
timeout: Option<u32>,
limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>,
offset: i32,
run_state: RunningState,
}
fn stream<B>(st: &mut State<B>) -> impl Stream<Item = Result<Update, B::Err>> + '_
where
B: Requester,
{
stream::unfold(st, move |state| async move {
let State { timeout, limit, allowed_updates, bot, offset, run_state, .. } = &mut *state;
match run_state {
RunningState::Polling => {}
RunningState::Stopped => return None,
RunningState::Stopping => {
let mut req = bot.get_updates_fault_tolerant();
let payload = &mut req.payload_mut().0;
payload.offset = Some(*offset);
payload.timeout = *timeout;
payload.limit = Some(1);
payload.allowed_updates = allowed_updates.take();
return match req.send().await {
Ok(_) => {
*run_state = RunningState::Stopped;
None
}
Err(err) => Some((stream::iter(vec![Err(err)]), state)),
};
}
}
stream::unfold(
(allowed_updates, requester, 0),
move |(mut allowed_updates, bot, mut offset)| async move {
let mut req = bot.get_updates_fault_tolerant();
let payload = &mut req.payload_mut().0;
payload.offset = Some(offset);
payload.timeout = timeout;
payload.limit = limit;
payload.offset = Some(*offset);
payload.timeout = *timeout;
payload.limit = *limit;
payload.allowed_updates = allowed_updates.take();
let updates = match req.send().await {
@ -181,7 +259,7 @@ where
.expect("update_id must be i32"),
};
offset = id + 1;
*offset = id + 1;
}
for update in &updates {
@ -200,10 +278,27 @@ where
}
};
Some((stream::iter(updates), (allowed_updates, bot, offset)))
},
)
.flatten()
Some((stream::iter(updates), state))
})
.flatten()
}
let timeout = timeout.map(|t| t.as_secs().try_into().expect("timeout is too big"));
let state = State {
bot: requester,
timeout,
limit,
allowed_updates,
offset: 0,
run_state: RunningState::Polling,
};
let stop = assert_stop_fn(|st: &mut State<_>| {
st.run_state = RunningState::Stopping;
});
StatefulListner { state, stream, stop }
}
async fn delete_webhook_if_setup<R>(requester: &R)
@ -226,3 +321,52 @@ where
}
}
}
/// A listner created from `state` and `stream`/`stop` functions.
struct StatefulListner<St, Sf, F> {
/// The state of the listner.
state: St,
/// Function used as `AsUpdateStream::as_stream`.
///
/// Must be of type `for<'a> &'a mut St -> impl Stream + 'a` and callable by
/// `&mut`.
stream: Sf,
/// Function used as `UpdateListner::stop`.
///
/// Must be of type `for<'a> &'a mut St`.
stop: Option<F>,
}
impl<'a, St, Sf, F, Strm, E> AsUpdateStream<'a, E> for StatefulListner<St, Sf, F>
where
(St, Strm): 'a,
Sf: FnMut(&'a mut St) -> Strm,
Strm: Stream<Item = Result<Update, E>>,
{
type Stream = Strm;
fn as_stream(&'a mut self) -> Self::Stream {
(self.stream)(&mut self.state)
}
}
impl<St, Sf, F, E> UpdateListener<E> for StatefulListner<St, Sf, F>
where
Self: for<'a> AsUpdateStream<'a, E>,
F: FnOnce(&mut St),
{
fn stop(&mut self) {
self.stop.take().map(|stop| stop(&mut self.state));
}
}
/// Assert (at compile tume) that `f` is fine as a stop-function (closure
/// lifetime inference workaround).
fn assert_stop_fn<F, St>(f: F) -> Option<F>
where
F: FnOnce(&mut St),
{
Some(f)
}