Update teloxide-core yet again

This commit is contained in:
Maybe Waffle 2022-01-12 12:50:35 +03:00
parent edecf5975f
commit d5392fddab
6 changed files with 29 additions and 49 deletions

View file

@ -56,7 +56,7 @@ full = [
[dependencies] [dependencies]
#teloxide-core = { version = "0.3.3", default-features = false } #teloxide-core = { version = "0.3.3", default-features = false }
teloxide-core = { git = "https://github.com/teloxide/teloxide-core.git", rev = "1219c8fbe26c270e8106396dfd3adbc937265b60", default-features = false } teloxide-core = { git = "https://github.com/teloxide/teloxide-core.git", rev = "dad5d5d", default-features = false }
teloxide-macros = { version = "0.4", optional = true } teloxide-macros = { version = "0.4", optional = true }
serde_json = "1.0" serde_json = "1.0"

View file

@ -266,7 +266,7 @@ where
pub async fn dispatch(&mut self) pub async fn dispatch(&mut self)
where where
R: Requester + Clone, R: Requester + Clone,
<R as Requester>::GetUpdatesFaultTolerant: Send, <R as Requester>::GetUpdates: Send,
{ {
let listener = update_listeners::polling_default(self.requester.clone()).await; let listener = update_listeners::polling_default(self.requester.clone()).await;
let error_handler = let error_handler =
@ -455,6 +455,14 @@ where
chat_join_request, chat_join_request,
"UpdateKind::ChatJoinRequest", "UpdateKind::ChatJoinRequest",
), ),
UpdateKind::Error(err) => {
log::error!(
"Cannot parse an update.\nError: {:?}\n\
This is a bug in teloxide-core, please open an issue here: \
https://github.com/teloxide/teloxide-core/issues.",
err,
);
}
} }
} }
} }

View file

@ -31,7 +31,7 @@ where
HandlerE: Debug + Send, HandlerE: Debug + Send,
N: Into<String> + Send + 'static, N: Into<String> + Send + 'static,
R: Requester + Send + Clone + 'static, R: Requester + Send + Clone + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send, <R as Requester>::GetUpdates: Send,
{ {
let cloned_requester = requester.clone(); let cloned_requester = requester.clone();

View file

@ -29,7 +29,7 @@ where
D: Clone + Default + Send + 'static, D: Clone + Default + Send + 'static,
Fut: Future<Output = DialogueStage<D>> + Send + 'static, Fut: Future<Output = DialogueStage<D>> + Send + 'static,
R: Requester + Send + Clone + 'static, R: Requester + Send + Clone + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send, <R as Requester>::GetUpdates: Send,
{ {
let cloned_requester = requester.clone(); let cloned_requester = requester.clone();

View file

@ -28,7 +28,7 @@ where
Result<(), E>: OnError<E>, Result<(), E>: OnError<E>,
E: Debug + Send, E: Debug + Send,
R: Requester + Send + Clone + 'static, R: Requester + Send + Clone + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send, <R as Requester>::GetUpdates: Send,
{ {
let cloned_requester = requester.clone(); let cloned_requester = requester.clone();
repl_with_listener( repl_with_listener(

View file

@ -10,9 +10,9 @@ use crate::{
stop_token::{AsyncStopFlag, AsyncStopToken}, stop_token::{AsyncStopFlag, AsyncStopToken},
update_listeners::{stateful_listener::StatefulListener, UpdateListener}, update_listeners::{stateful_listener::StatefulListener, UpdateListener},
}, },
payloads::GetUpdates, payloads::{GetUpdates, GetUpdatesSetters as _},
requests::{HasPayload, Request, Requester}, requests::{HasPayload, Request, Requester},
types::{AllowedUpdate, SemiparsedVec, Update}, types::{AllowedUpdate, Update},
}; };
/// Returns a long polling update listener with `timeout` of 10 seconds. /// Returns a long polling update listener with `timeout` of 10 seconds.
@ -25,7 +25,7 @@ use crate::{
pub async fn polling_default<R>(requester: R) -> impl UpdateListener<R::Err> pub async fn polling_default<R>(requester: R) -> impl UpdateListener<R::Err>
where where
R: Requester + Send + 'static, R: Requester + Send + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send, <R as Requester>::GetUpdates: Send,
{ {
delete_webhook_if_setup(&requester).await; delete_webhook_if_setup(&requester).await;
polling(requester, Some(Duration::from_secs(10)), None, None) polling(requester, Some(Duration::from_secs(10)), None, None)
@ -51,7 +51,7 @@ pub fn polling<R>(
) -> impl UpdateListener<R::Err> ) -> impl UpdateListener<R::Err>
where where
R: Requester + Send + 'static, R: Requester + Send + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send, <R as Requester>::GetUpdates: Send,
{ {
struct State<B: Requester> { struct State<B: Requester> {
bot: B, bot: B,
@ -66,20 +66,14 @@ where
fn stream<B>(st: &mut State<B>) -> impl Stream<Item = Result<Update, B::Err>> + Send + '_ fn stream<B>(st: &mut State<B>) -> impl Stream<Item = Result<Update, B::Err>> + Send + '_
where where
B: Requester + Send, B: Requester + Send,
<B as Requester>::GetUpdatesFaultTolerant: Send, <B as Requester>::GetUpdates: Send,
{ {
stream::unfold(st, move |state| async move { stream::unfold(st, move |state| async move {
let State { timeout, limit, allowed_updates, bot, offset, flag, .. } = &mut *state; let State { timeout, limit, allowed_updates, bot, offset, flag, .. } = &mut *state;
if flag.is_stopped() { if flag.is_stopped() {
let mut req = bot.get_updates_fault_tolerant(); let mut req = bot.get_updates().offset(*offset).timeout(0).limit(1);
req.payload_mut().allowed_updates = allowed_updates.take();
req.payload_mut().0 = GetUpdates {
offset: Some(*offset),
timeout: Some(0),
limit: Some(1),
allowed_updates: allowed_updates.take(),
};
return match req.send().await { return match req.send().await {
Ok(_) => None, Ok(_) => None,
@ -87,48 +81,26 @@ where
}; };
} }
let mut req = bot.get_updates_fault_tolerant(); let mut req = bot.get_updates();
req.payload_mut().0 = GetUpdates { *req.payload_mut() = GetUpdates {
offset: Some(*offset), offset: Some(*offset),
timeout: *timeout, timeout: *timeout,
limit: *limit, limit: *limit,
allowed_updates: allowed_updates.take(), allowed_updates: allowed_updates.take(),
}; };
let updates = match req.send().await { match req.send().await {
Err(err) => return Some((Either::Left(stream::once(ready(Err(err)))), state)), Ok(updates) => {
Ok(SemiparsedVec(updates)) => {
// Set offset to the last update's id + 1 // Set offset to the last update's id + 1
if let Some(upd) = updates.last() { if let Some(upd) = updates.last() {
let id: i32 = match upd { *offset = upd.id + 1;
Ok(ok) => ok.id,
Err((value, _)) => value["update_id"]
.as_i64()
.expect("The 'update_id' field must always exist in Update")
.try_into()
.expect("update_id must be i32"),
};
*offset = id + 1;
} }
for update in &updates { let updates = updates.into_iter().map(Ok);
if let Err((value, e)) = update { Some((Either::Right(stream::iter(updates)), state))
log::error!(
"Cannot parse an update.\nError: {:?}\nValue: {}\n\
This is a bug in teloxide-core, please open an issue here: \
https://github.com/teloxide/teloxide-core/issues.",
e,
value
);
}
}
updates.into_iter().filter_map(Result::ok).map(Ok)
} }
}; Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)),
}
Some((Either::Right(stream::iter(updates)), state))
}) })
.flatten() .flatten()
} }