Add UpdateListener::timeout_hint

This commit is contained in:
Waffle 2021-05-18 17:29:45 +03:00
parent c288a540b9
commit 9f5a222ed7

View file

@ -141,6 +141,11 @@ pub trait UpdateListener<E>: for<'a> AsUpdateStream<'a, E> {
/// updates as soon as possible and return `None` from the update stream as /// updates as soon as possible and return `None` from the update stream as
/// soon as all cached updates are returned. /// soon as all cached updates are returned.
fn stop(&mut self); fn stop(&mut self);
/// Timeout duration hint.
fn timeout_hint(&self) -> Option<Duration> {
None
}
} }
/// [`UpdateListener`]'s supertrait/extension. /// [`UpdateListener`]'s supertrait/extension.
@ -313,11 +318,9 @@ where
.flatten() .flatten()
} }
let timeout = timeout.map(|t| t.as_secs().try_into().expect("timeout is too big"));
let state = State { let state = State {
bot: requester, bot: requester,
timeout, timeout: timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")),
limit, limit,
allowed_updates, allowed_updates,
offset: 0, offset: 0,
@ -325,11 +328,11 @@ where
fetched: None, fetched: None,
}; };
let stop = assert_stop_fn(|st: &mut State<_>| { let stop = Some(|st: &mut State<_>| st.run_state = RunningState::Stopping);
st.run_state = RunningState::Stopping;
});
StatefulListner { state, stream, stop } let timeout_hint = Some(move |_: &State<_>| timeout);
StatefulListner { state, stream, stop, timeout_hint }
} }
async fn delete_webhook_if_setup<R>(requester: &R) async fn delete_webhook_if_setup<R>(requester: &R)
@ -354,7 +357,7 @@ where
} }
/// A listner created from `state` and `stream`/`stop` functions. /// A listner created from `state` and `stream`/`stop` functions.
struct StatefulListner<St, Sf, F> { struct StatefulListner<St, Assf, Sf, Thf> {
/// The state of the listner. /// The state of the listner.
state: St, state: St,
@ -362,18 +365,24 @@ struct StatefulListner<St, Sf, F> {
/// ///
/// Must be of type `for<'a> &'a mut St -> impl Stream + 'a` and callable by /// Must be of type `for<'a> &'a mut St -> impl Stream + 'a` and callable by
/// `&mut`. /// `&mut`.
stream: Sf, stream: Assf,
/// Function used as `UpdateListner::stop`. /// Function used as `UpdateListner::stop`.
/// ///
/// Must be of type `for<'a> &'a mut St`. /// Must be of type `for<'a> &'a mut St`.
stop: Option<F>, stop: Option<Sf>,
/// Function used as `UpdateListner::timeout_hint`.
///
/// Must be of type `for<'a> &'a St -> Option<Duration>` and callable by
/// `&`.
timeout_hint: Option<Thf>,
} }
impl<'a, St, Sf, F, Strm, E> AsUpdateStream<'a, E> for StatefulListner<St, Sf, F> impl<'a, St, Assf, Sf, Thf, Strm, E> AsUpdateStream<'a, E> for StatefulListner<St, Assf, Sf, Thf>
where where
(St, Strm): 'a, (St, Strm): 'a,
Sf: FnMut(&'a mut St) -> Strm, Assf: FnMut(&'a mut St) -> Strm,
Strm: Stream<Item = Result<Update, E>>, Strm: Stream<Item = Result<Update, E>>,
{ {
type Stream = Strm; type Stream = Strm;
@ -383,21 +392,17 @@ where
} }
} }
impl<St, Sf, F, E> UpdateListener<E> for StatefulListner<St, Sf, F> impl<St, Assf, Sf, Thf, E> UpdateListener<E> for StatefulListner<St, Assf, Sf, Thf>
where where
Self: for<'a> AsUpdateStream<'a, E>, Self: for<'a> AsUpdateStream<'a, E>,
F: FnOnce(&mut St), Sf: FnOnce(&mut St),
Thf: Fn(&St) -> Option<Duration>,
{ {
fn stop(&mut self) { fn stop(&mut self) {
self.stop.take().map(|stop| stop(&mut self.state)); self.stop.take().map(|stop| stop(&mut self.state));
} }
}
/// Assert (at compile tume) that `f` is fine as a stop-function (closure fn timeout_hint(&self) -> Option<Duration> {
/// lifetime inference workaround). self.timeout_hint.as_ref().and_then(|f| f(&self.state))
fn assert_stop_fn<F, St>(f: F) -> Option<F> }
where
F: FnOnce(&mut St),
{
Some(f)
} }