Make StatefulListener pub

This commit is contained in:
Waffle 2021-05-22 00:37:16 +03:00
parent 881aa3d6b6
commit f58ae9b9ca
3 changed files with 71 additions and 13 deletions

View file

@ -112,7 +112,10 @@ use crate::{dispatching::stop_token::StopToken, types::Update};
mod polling;
mod stateful_listener;
pub use self::polling::{polling, polling_default};
pub use self::{
polling::{polling, polling_default},
stateful_listener::StatefulListener,
};
/// An update listener.
///

View file

@ -148,7 +148,7 @@ where
let timeout_hint = Some(move |_: &State<_>| timeout);
StatefulListener { state, stream, stop, timeout_hint }
StatefulListener { state, stream, stop_token: stop, timeout_hint }
}
async fn delete_webhook_if_setup<R>(requester: &R)

View file

@ -4,31 +4,79 @@ use futures::Stream;
use teloxide_core::types::Update;
use crate::dispatching::{
stop_token::StopToken,
stop_token::{self, StopToken},
update_listeners::{AsUpdateStream, UpdateListener},
};
/// A listener created from `state` and `stream`/`stop` functions.
pub(crate) struct StatefulListener<St, Assf, Sf, Thf> {
/// A listener created from functions.
///
/// This type allows to turn a stream of updates (+some additional functions)
/// into an [`UpdateListener`].
///
/// For an example of usage see [`polling`]
///
/// [`polling`]: crate::dispatching::update_listeners::polling()
#[non_exhaustive]
pub struct StatefulListener<St, Assf, Sf, Thf> {
/// The state of the listener.
pub(crate) state: St,
pub state: St,
/// Function used as `AsUpdateStream::as_stream`.
/// Function used as [`AsUpdateStream::as_stream`].
///
/// Must be of type `for<'a> &'a mut St -> impl Stream + 'a` and callable by
/// `&mut`.
pub(crate) stream: Assf,
pub stream: Assf,
/// Function used as `UpdateListener::stop`.
/// Function used as [`UpdateListener::stop_token`].
///
/// Must be of type `for<'a> &'a mut St -> impl StopToken`.
pub(crate) stop: Sf,
pub stop_token: Sf,
/// Function used as `UpdateListener::timeout_hint`.
/// Function used as [`UpdateListener::timeout_hint`].
///
/// Must be of type `for<'a> &'a St -> Option<Duration>` and callable by
/// `&`.
pub(crate) timeout_hint: Option<Thf>,
pub timeout_hint: Option<Thf>,
}
impl<St, Assf, Sf, Thf> StatefulListener<St, Assf, Sf, Thf> {
/// Creates new stateful listener from it's components.
pub fn new(state: St, stream: Assf, stop_token: Sf, timeout_hint: Option<Thf>) -> Self {
Self { state, stream, stop_token, timeout_hint }
}
}
impl<S, E>
StatefulListener<
S,
for<'a> fn(&'a mut S) -> &'a mut S,
for<'a> fn(&'a mut S) -> stop_token::Noop,
for<'a> fn(&'a S) -> Option<Duration>,
>
where
S: Stream<Item = Result<Update, E>> + Unpin + 'static,
{
/// Creates a new update listner from a stream of updates which ignore stop
/// signals.
///
/// It won't be possible to ever stop this listener with stop token.
pub fn from_stream_without_graceful_shutdown(stream: S) -> Self {
let this = Self {
state: stream,
stream: |s| s,
stop_token: |_| stop_token::Noop,
timeout_hint: Some(|_| {
// FIXME: replace this by just Duration::MAX once 1.53 releases
// be released
const NANOS_PER_SEC: u32 = 1_000_000_000;
let dmax = Duration::new(u64::MAX, NANOS_PER_SEC - 1);
Some(dmax)
}),
};
assert_update_listener(this)
}
}
impl<'a, St, Assf, Sf, Thf, Strm, E> AsUpdateStream<'a, E> for StatefulListener<St, Assf, Sf, Thf>
@ -54,10 +102,17 @@ where
type StopToken = Stt;
fn stop_token(&mut self) -> Stt {
(self.stop)(&mut self.state)
(self.stop_token)(&mut self.state)
}
fn timeout_hint(&self) -> Option<Duration> {
self.timeout_hint.as_ref().and_then(|f| f(&self.state))
}
}
fn assert_update_listener<L, E>(l: L) -> L
where
L: UpdateListener<E>,
{
l
}