From d5392fddabc4210029759c765fca91846c27aa50 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Wed, 12 Jan 2022 12:50:35 +0300 Subject: [PATCH] Update `teloxide-core` yet again --- Cargo.toml | 2 +- src/dispatching/dispatcher.rs | 10 +++- src/dispatching/repls/commands_repl.rs | 2 +- src/dispatching/repls/dialogues_repl.rs | 2 +- src/dispatching/repls/repl.rs | 2 +- src/dispatching/update_listeners/polling.rs | 60 ++++++--------------- 6 files changed, 29 insertions(+), 49 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fdee6fdb..ccf5208a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ full = [ [dependencies] #teloxide-core = { version = "0.3.3", default-features = false } -teloxide-core = { git = "https://github.com/teloxide/teloxide-core.git", rev = "1219c8fbe26c270e8106396dfd3adbc937265b60", default-features = false } +teloxide-core = { git = "https://github.com/teloxide/teloxide-core.git", rev = "dad5d5d", default-features = false } teloxide-macros = { version = "0.4", optional = true } serde_json = "1.0" diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 756ebd63..dc1b1b00 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -266,7 +266,7 @@ where pub async fn dispatch(&mut self) where R: Requester + Clone, - ::GetUpdatesFaultTolerant: Send, + ::GetUpdates: Send, { let listener = update_listeners::polling_default(self.requester.clone()).await; let error_handler = @@ -455,6 +455,14 @@ where chat_join_request, "UpdateKind::ChatJoinRequest", ), + UpdateKind::Error(err) => { + log::error!( + "Cannot parse an update.\nError: {:?}\n\ + This is a bug in teloxide-core, please open an issue here: \ + https://github.com/teloxide/teloxide-core/issues.", + err, + ); + } } } } diff --git a/src/dispatching/repls/commands_repl.rs b/src/dispatching/repls/commands_repl.rs index c1799d58..09fc1c4a 100644 --- a/src/dispatching/repls/commands_repl.rs +++ b/src/dispatching/repls/commands_repl.rs @@ -31,7 +31,7 @@ where HandlerE: Debug + Send, N: Into + Send + 'static, R: Requester + Send + Clone + 'static, - ::GetUpdatesFaultTolerant: Send, + ::GetUpdates: Send, { let cloned_requester = requester.clone(); diff --git a/src/dispatching/repls/dialogues_repl.rs b/src/dispatching/repls/dialogues_repl.rs index 345c6bcb..f0819183 100644 --- a/src/dispatching/repls/dialogues_repl.rs +++ b/src/dispatching/repls/dialogues_repl.rs @@ -29,7 +29,7 @@ where D: Clone + Default + Send + 'static, Fut: Future> + Send + 'static, R: Requester + Send + Clone + 'static, - ::GetUpdatesFaultTolerant: Send, + ::GetUpdates: Send, { let cloned_requester = requester.clone(); diff --git a/src/dispatching/repls/repl.rs b/src/dispatching/repls/repl.rs index 635a030c..b8ea3a3e 100644 --- a/src/dispatching/repls/repl.rs +++ b/src/dispatching/repls/repl.rs @@ -28,7 +28,7 @@ where Result<(), E>: OnError, E: Debug + Send, R: Requester + Send + Clone + 'static, - ::GetUpdatesFaultTolerant: Send, + ::GetUpdates: Send, { let cloned_requester = requester.clone(); repl_with_listener( diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index 0690bd30..b8ca9f90 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -10,9 +10,9 @@ use crate::{ stop_token::{AsyncStopFlag, AsyncStopToken}, update_listeners::{stateful_listener::StatefulListener, UpdateListener}, }, - payloads::GetUpdates, + payloads::{GetUpdates, GetUpdatesSetters as _}, requests::{HasPayload, Request, Requester}, - types::{AllowedUpdate, SemiparsedVec, Update}, + types::{AllowedUpdate, Update}, }; /// Returns a long polling update listener with `timeout` of 10 seconds. @@ -25,7 +25,7 @@ use crate::{ pub async fn polling_default(requester: R) -> impl UpdateListener where R: Requester + Send + 'static, - ::GetUpdatesFaultTolerant: Send, + ::GetUpdates: Send, { delete_webhook_if_setup(&requester).await; polling(requester, Some(Duration::from_secs(10)), None, None) @@ -51,7 +51,7 @@ pub fn polling( ) -> impl UpdateListener where R: Requester + Send + 'static, - ::GetUpdatesFaultTolerant: Send, + ::GetUpdates: Send, { struct State { bot: B, @@ -66,20 +66,14 @@ where fn stream(st: &mut State) -> impl Stream> + Send + '_ where B: Requester + Send, - ::GetUpdatesFaultTolerant: Send, + ::GetUpdates: Send, { stream::unfold(st, move |state| async move { let State { timeout, limit, allowed_updates, bot, offset, flag, .. } = &mut *state; if flag.is_stopped() { - let mut req = bot.get_updates_fault_tolerant(); - - req.payload_mut().0 = GetUpdates { - offset: Some(*offset), - timeout: Some(0), - limit: Some(1), - allowed_updates: allowed_updates.take(), - }; + let mut req = bot.get_updates().offset(*offset).timeout(0).limit(1); + req.payload_mut().allowed_updates = allowed_updates.take(); return match req.send().await { Ok(_) => None, @@ -87,48 +81,26 @@ where }; } - let mut req = bot.get_updates_fault_tolerant(); - req.payload_mut().0 = GetUpdates { + let mut req = bot.get_updates(); + *req.payload_mut() = GetUpdates { offset: Some(*offset), timeout: *timeout, limit: *limit, allowed_updates: allowed_updates.take(), }; - let updates = match req.send().await { - Err(err) => return Some((Either::Left(stream::once(ready(Err(err)))), state)), - Ok(SemiparsedVec(updates)) => { + match req.send().await { + Ok(updates) => { // Set offset to the last update's id + 1 if let Some(upd) = updates.last() { - let id: i32 = match upd { - Ok(ok) => ok.id, - Err((value, _)) => value["update_id"] - .as_i64() - .expect("The 'update_id' field must always exist in Update") - .try_into() - .expect("update_id must be i32"), - }; - - *offset = id + 1; + *offset = upd.id + 1; } - for update in &updates { - if let Err((value, e)) = update { - log::error!( - "Cannot parse an update.\nError: {:?}\nValue: {}\n\ - This is a bug in teloxide-core, please open an issue here: \ - https://github.com/teloxide/teloxide-core/issues.", - e, - value - ); - } - } - - updates.into_iter().filter_map(Result::ok).map(Ok) + let updates = updates.into_iter().map(Ok); + Some((Either::Right(stream::iter(updates)), state)) } - }; - - Some((Either::Right(stream::iter(updates)), state)) + Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)), + } }) .flatten() }