Do not retry get_updates infinitely while stopping polling listener

This commit is contained in:
Maybe Waffle 2022-01-17 14:42:49 +03:00
parent c12fae2008
commit 993e6e53aa
2 changed files with 18 additions and 2 deletions

View file

@ -10,6 +10,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Require that `AsUpdateStream::Stream` is `Send` - Require that `AsUpdateStream::Stream` is `Send`
### Fixed
- Infinite retries while stopping polling listener ([issue 496](https://github.com/teloxide/teloxide/issues/496))
## 0.5.3 - 2021-10-25 ## 0.5.3 - 2021-10-25
### Fixed ### Fixed

View file

@ -61,6 +61,7 @@ where
offset: i32, offset: i32,
flag: AsyncStopFlag, flag: AsyncStopFlag,
token: AsyncStopToken, token: AsyncStopToken,
force_stop: bool,
} }
fn stream<B>(st: &mut State<B>) -> impl Stream<Item = Result<Update, B::Err>> + Send + '_ fn stream<B>(st: &mut State<B>) -> impl Stream<Item = Result<Update, B::Err>> + Send + '_
@ -69,7 +70,12 @@ where
<B as Requester>::GetUpdates: Send, <B as Requester>::GetUpdates: Send,
{ {
stream::unfold(st, move |state| async move { stream::unfold(st, move |state| async move {
let State { timeout, limit, allowed_updates, bot, offset, flag, .. } = &mut *state; let State { timeout, limit, allowed_updates, bot, offset, flag, force_stop, .. } =
&mut *state;
if *force_stop {
return None;
}
if flag.is_stopped() { if flag.is_stopped() {
let mut req = bot.get_updates().offset(*offset).timeout(0).limit(1); let mut req = bot.get_updates().offset(*offset).timeout(0).limit(1);
@ -77,7 +83,12 @@ where
return match req.send().await { return match req.send().await {
Ok(_) => None, Ok(_) => None,
Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)), Err(err) => {
// Prevents infinite retries, see https://github.com/teloxide/teloxide/issues/496
*force_stop = true;
Some((Either::Left(stream::once(ready(Err(err)))), state))
}
}; };
} }
@ -115,6 +126,7 @@ where
offset: 0, offset: 0,
flag, flag,
token, token,
force_stop: false,
}; };
let stop_token = |st: &mut State<_>| st.token.clone(); let stop_token = |st: &mut State<_>| st.token.clone();