Rewrite hacks for reliable Update deserialization

This commit is contained in:
Maybe Waffle 2023-01-31 13:25:58 +04:00
parent 6909353500
commit cbbfc5b3d1
5 changed files with 108 additions and 85 deletions

View file

@ -10,8 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed ### Fixed
- `Update::user` now handles channel posts, chat member changes and chat join request updates correctly ([#835][pr835]) - `Update::user` now handles channel posts, chat member changes and chat join request updates correctly ([#835][pr835])
- In cases when `teloxide` can't deserialize an update, error now includes the full json value ([#826][pr826])
[pr835]: https://github.com/teloxide/teloxide/pull/835 [pr835]: https://github.com/teloxide/teloxide/pull/835
[pr826]: https://github.com/teloxide/teloxide/pull/826
## 0.9.0 - 2023-01-17 ## 0.9.0 - 2023-01-17

View file

@ -206,7 +206,7 @@ impl Bot {
) -> impl Future<Output = ResponseResult<P::Output>> + 'static ) -> impl Future<Output = ResponseResult<P::Output>> + 'static
where where
P: Payload + Serialize, P: Payload + Serialize,
P::Output: DeserializeOwned, P::Output: DeserializeOwned + 'static,
{ {
let client = self.client.clone(); let client = self.client.clone();
let token = Arc::clone(&self.token); let token = Arc::clone(&self.token);
@ -237,7 +237,7 @@ impl Bot {
) -> impl Future<Output = ResponseResult<P::Output>> ) -> impl Future<Output = ResponseResult<P::Output>>
where where
P: MultipartPayload + Serialize, P: MultipartPayload + Serialize,
P::Output: DeserializeOwned, P::Output: DeserializeOwned + 'static,
{ {
let client = self.client.clone(); let client = self.client.clone();
let token = Arc::clone(&self.token); let token = Arc::clone(&self.token);
@ -267,7 +267,7 @@ impl Bot {
) -> impl Future<Output = ResponseResult<P::Output>> ) -> impl Future<Output = ResponseResult<P::Output>>
where where
P: MultipartPayload + Serialize, P: MultipartPayload + Serialize,
P::Output: DeserializeOwned, P::Output: DeserializeOwned + 'static,
{ {
let client = self.client.clone(); let client = self.client.clone();
let token = Arc::clone(&self.token); let token = Arc::clone(&self.token);

View file

@ -1,4 +1,4 @@
use std::time::Duration; use std::{any::TypeId, time::Duration};
use reqwest::{ use reqwest::{
header::{HeaderValue, CONTENT_TYPE}, header::{HeaderValue, CONTENT_TYPE},
@ -19,7 +19,7 @@ pub async fn request_multipart<T>(
_timeout_hint: Option<Duration>, _timeout_hint: Option<Duration>,
) -> ResponseResult<T> ) -> ResponseResult<T>
where where
T: DeserializeOwned, T: DeserializeOwned + 'static,
{ {
// Workaround for [#460] // Workaround for [#460]
// //
@ -58,7 +58,7 @@ pub async fn request_json<T>(
_timeout_hint: Option<Duration>, _timeout_hint: Option<Duration>,
) -> ResponseResult<T> ) -> ResponseResult<T>
where where
T: DeserializeOwned, T: DeserializeOwned + 'static,
{ {
// Workaround for [#460] // Workaround for [#460]
// //
@ -91,7 +91,7 @@ where
async fn process_response<T>(response: Response) -> ResponseResult<T> async fn process_response<T>(response: Response) -> ResponseResult<T>
where where
T: DeserializeOwned, T: DeserializeOwned + 'static,
{ {
if response.status().is_server_error() { if response.status().is_server_error() {
tokio::time::sleep(DELAY_ON_SERVER_ERROR).await; tokio::time::sleep(DELAY_ON_SERVER_ERROR).await;
@ -100,6 +100,44 @@ where
let text = response.text().await?; let text = response.text().await?;
serde_json::from_str::<TelegramResponse<T>>(&text) serde_json::from_str::<TelegramResponse<T>>(&text)
.map(|mut response| {
use crate::types::{Update, UpdateKind};
use std::any::Any;
// HACK: Fill-in error information into `UpdateKind::Error`.
//
// Why? Well, we need `Update` deserialization to be reliable,
// even if Telegram breaks something in their Bot API, we want
// 1. Deserialization to """succeed"""
// 2. Get the `update.id`
//
// Both of these points are required for `get_updates(...) -> Vec<Update>`
// to behave well after Telegram introduces updates that we can't parse.
// (1.) makes it so only some of the updates in a butch need to be skipped
// (otherwise serde'll stop on the first error). (2.) allows us to issue
// the next `get_updates` call with the right offset, even if the last
// update in the batch didn't deserialize well.
//
// serde's interface doesn't allows us to implement `Deserialize` in such
// a way, that we could keep the data we couldn't parse, so our
// `Deserialize` impl for `UpdateKind` just returns
// `UpdateKind::Error(/* some empty-ish value */)`. Here, through some
// terrible hacks and downcasting, we fill-in the data we couldn't parse
// so that our users can make actionable bug reports.
if TypeId::of::<T>() == TypeId::of::<Update>() {
if let TelegramResponse::Ok { response, .. } = &mut response {
if let Some(update) =
(response as &mut T as &mut dyn Any).downcast_mut::<Update>()
{
if let UpdateKind::Error(value) = &mut update.kind {
*value = serde_json::from_str(&text).unwrap_or_default();
}
}
}
}
response
})
.map_err(|source| RequestError::InvalidJson { source, raw: text.into() })? .map_err(|source| RequestError::InvalidJson { source, raw: text.into() })?
.into() .into()
} }

View file

@ -156,7 +156,10 @@ pub enum UpdateKind {
/// An error that happened during deserialization. /// An error that happened during deserialization.
/// ///
/// This allows `teloxide` to continue working even if telegram adds a new /// This allows `teloxide` to continue working even if telegram adds a new
/// kind of updates. /// kinds of updates.
///
/// **Note that deserialize implementation always returns an empty value**,
/// teloxide fills in the data when doing deserialization.
Error(Value), Error(Value),
} }
@ -182,94 +185,63 @@ impl<'de> Deserialize<'de> for UpdateKind {
// Try to deserialize a borrowed-str key, or else try deserializing an owned // Try to deserialize a borrowed-str key, or else try deserializing an owned
// string key // string key
let k = map.next_key::<&str>().or_else(|_| { let key = map.next_key::<&str>().or_else(|_| {
map.next_key::<String>().map(|k| { map.next_key::<String>().map(|k| {
tmp = k; tmp = k;
tmp.as_deref() tmp.as_deref()
}) })
}); });
if let Ok(Some(k)) = k { let this = key
let res = match k { .ok()
"message" => { .flatten()
map.next_value::<Message>().map(UpdateKind::Message).map_err(|_| false) .and_then(|key| match key {
"message" => map.next_value::<Message>().ok().map(UpdateKind::Message),
"edited_message" => {
map.next_value::<Message>().ok().map(UpdateKind::EditedMessage)
}
"channel_post" => {
map.next_value::<Message>().ok().map(UpdateKind::ChannelPost)
}
"edited_channel_post" => {
map.next_value::<Message>().ok().map(UpdateKind::EditedChannelPost)
}
"inline_query" => {
map.next_value::<InlineQuery>().ok().map(UpdateKind::InlineQuery)
} }
"edited_message" => map
.next_value::<Message>()
.map(UpdateKind::EditedMessage)
.map_err(|_| false),
"channel_post" => map
.next_value::<Message>()
.map(UpdateKind::ChannelPost)
.map_err(|_| false),
"edited_channel_post" => map
.next_value::<Message>()
.map(UpdateKind::EditedChannelPost)
.map_err(|_| false),
"inline_query" => map
.next_value::<InlineQuery>()
.map(UpdateKind::InlineQuery)
.map_err(|_| false),
"chosen_inline_result" => map "chosen_inline_result" => map
.next_value::<ChosenInlineResult>() .next_value::<ChosenInlineResult>()
.map(UpdateKind::ChosenInlineResult) .ok()
.map_err(|_| false), .map(UpdateKind::ChosenInlineResult),
"callback_query" => map "callback_query" => {
.next_value::<CallbackQuery>() map.next_value::<CallbackQuery>().ok().map(UpdateKind::CallbackQuery)
.map(UpdateKind::CallbackQuery) }
.map_err(|_| false), "shipping_query" => {
"shipping_query" => map map.next_value::<ShippingQuery>().ok().map(UpdateKind::ShippingQuery)
.next_value::<ShippingQuery>() }
.map(UpdateKind::ShippingQuery)
.map_err(|_| false),
"pre_checkout_query" => map "pre_checkout_query" => map
.next_value::<PreCheckoutQuery>() .next_value::<PreCheckoutQuery>()
.map(UpdateKind::PreCheckoutQuery) .ok()
.map_err(|_| false), .map(UpdateKind::PreCheckoutQuery),
"poll" => map.next_value::<Poll>().map(UpdateKind::Poll).map_err(|_| false), "poll" => map.next_value::<Poll>().ok().map(UpdateKind::Poll),
"poll_answer" => map "poll_answer" => {
.next_value::<PollAnswer>() map.next_value::<PollAnswer>().ok().map(UpdateKind::PollAnswer)
.map(UpdateKind::PollAnswer) }
.map_err(|_| false), "my_chat_member" => {
"my_chat_member" => map map.next_value::<ChatMemberUpdated>().ok().map(UpdateKind::MyChatMember)
.next_value::<ChatMemberUpdated>() }
.map(UpdateKind::MyChatMember) "chat_member" => {
.map_err(|_| false), map.next_value::<ChatMemberUpdated>().ok().map(UpdateKind::ChatMember)
"chat_member" => map }
.next_value::<ChatMemberUpdated>()
.map(UpdateKind::ChatMember)
.map_err(|_| false),
"chat_join_request" => map "chat_join_request" => map
.next_value::<ChatJoinRequest>() .next_value::<ChatJoinRequest>()
.map(UpdateKind::ChatJoinRequest) .ok()
.map_err(|_| false), .map(UpdateKind::ChatJoinRequest),
_ => Some(empty_error()),
})
.unwrap_or_else(empty_error);
_ => Err(true), Ok(this)
};
let value_available = match res {
Ok(ok) => return Ok(ok),
Err(e) => e,
};
let mut value = serde_json::Map::new();
value.insert(
k.to_owned(),
if value_available {
map.next_value::<Value>().unwrap_or(Value::Null)
} else {
Value::Null
},
);
while let Ok(Some((k, v))) = map.next_entry::<_, Value>() {
value.insert(k, v);
}
return Ok(UpdateKind::Error(Value::Object(value)));
}
Ok(UpdateKind::Error(Value::Object(<_>::default())))
} }
} }
@ -319,6 +291,10 @@ impl Serialize for UpdateKind {
} }
} }
fn empty_error() -> UpdateKind {
UpdateKind::Error(Value::Object(<_>::default()))
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crate::types::{ use crate::types::{

View file

@ -9,7 +9,7 @@ use tokio::sync::mpsc;
use crate::{ use crate::{
requests::Requester, requests::Requester,
stop::StopFlag, stop::StopFlag,
types::Update, types::{Update, UpdateKind},
update_listeners::{webhooks::Options, UpdateListener}, update_listeners::{webhooks::Options, UpdateListener},
}; };
@ -186,8 +186,14 @@ pub fn axum_no_setup(
Some(tx) => tx, Some(tx) => tx,
}; };
match serde_json::from_str(&input) { match serde_json::from_str::<Update>(&input) {
Ok(update) => { Ok(mut update) => {
// See HACK comment in
// `teloxide_core::net::request::process_response::{closure#0}`
if let UpdateKind::Error(value) = &mut update.kind {
*value = serde_json::from_str(&input).unwrap_or_default();
}
tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook") tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook")
} }
Err(error) => { Err(error) => {