Require that AsUpdateStream::Stream is Send

This commit is contained in:
Waffle 2021-09-04 14:02:56 +03:00
parent 18cfb675bf
commit a97050de24
4 changed files with 32 additions and 6 deletions

View file

@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## unreleased
### Changed
- Require that `AsUpdateStream::Stream` is `Send`
## 0.5.2 - 2021-08-25
### Fixed

View file

@ -188,7 +188,14 @@ pub trait UpdateListener<E>: for<'a> AsUpdateStream<'a, E> {
/// This trait is a workaround to not require GAT.
pub trait AsUpdateStream<'a, E> {
/// The stream of updates from Telegram.
type Stream: Stream<Item = Result<Update, E>> + 'a;
// HACK: There is currently no way to write something like
// `-> impl for<'a> AsUpdateStream<'a, E, Stream: Send>`. Since we return
// `impl UpdateListener<E>` from `polling`, we need to have `Send` bound here,
// to make the stream `Send`.
//
// Without this it's, for example, impossible to spawn a tokio task with
// teloxide polling.
type Stream: Stream<Item = Result<Update, E>> + Send + 'a;
/// Creates the update [`Stream`].
///

View file

@ -24,7 +24,7 @@ use crate::{
/// This function will automatically delete a webhook if it was set up.
pub async fn polling_default<R>(requester: R) -> impl UpdateListener<R::Err>
where
R: Requester + 'static,
R: Requester + Send + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
delete_webhook_if_setup(&requester).await;
@ -50,7 +50,7 @@ pub fn polling<R>(
allowed_updates: Option<Vec<AllowedUpdate>>,
) -> impl UpdateListener<R::Err>
where
R: Requester + 'static,
R: Requester + Send + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
struct State<B: Requester> {
@ -63,9 +63,10 @@ where
token: AsyncStopToken,
}
fn stream<B>(st: &mut State<B>) -> impl Stream<Item = Result<Update, B::Err>> + '_
fn stream<B>(st: &mut State<B>) -> impl Stream<Item = Result<Update, B::Err>> + Send + '_
where
B: Requester,
B: Requester + Send,
<B as Requester>::GetUpdatesFaultTolerant: Send,
{
stream::unfold(st, move |state| async move {
let State { timeout, limit, allowed_updates, bot, offset, flag, .. } = &mut *state;
@ -177,3 +178,16 @@ where
}
}
}
#[test]
fn polling_is_send() {
use crate::dispatching::update_listeners::AsUpdateStream;
let bot = crate::Bot::new("TOKEN");
let mut polling = polling(bot, None, None, None);
assert_send(&polling);
assert_send(&polling.as_stream());
fn assert_send(_: &impl Send) {}
}

View file

@ -79,7 +79,7 @@ impl<S, E>
Thfn<S>,
>
where
S: Stream<Item = Result<Update, E>> + Unpin + 'static,
S: Stream<Item = Result<Update, E>> + Unpin + Send + 'static,
{
/// Creates a new update listener from a stream of updates which ignores
/// stop signals.
@ -109,6 +109,7 @@ impl<'a, St, Assf, Sf, Hauf, Thf, Strm, E> AsUpdateStream<'a, E>
for StatefulListener<St, Assf, Hauf, Sf, Thf>
where
(St, Strm): 'a,
Strm: Send,
Assf: FnMut(&'a mut St) -> Strm,
Strm: Stream<Item = Result<Update, E>>,
{