Merge pull request #826 from teloxide/better_error_workaround

Rewrite hacks for reliable `Update` deserialization
This commit is contained in:
Sima Kinsart 2023-02-15 12:01:18 +00:00 committed by GitHub
commit 42514e93db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 240 additions and 85 deletions

View file

@ -10,8 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed ### Fixed
- `Update::user` now handles channel posts, chat member changes and chat join request updates correctly ([#835][pr835]) - `Update::user` now handles channel posts, chat member changes and chat join request updates correctly ([#835][pr835])
- In cases when `teloxide` can't deserialize an update, error now includes the full json value ([#826][pr826])
[pr835]: https://github.com/teloxide/teloxide/pull/835 [pr835]: https://github.com/teloxide/teloxide/pull/835
[pr826]: https://github.com/teloxide/teloxide/pull/826
## 0.9.0 - 2023-01-17 ## 0.9.0 - 2023-01-17

View file

@ -206,7 +206,7 @@ impl Bot {
) -> impl Future<Output = ResponseResult<P::Output>> + 'static ) -> impl Future<Output = ResponseResult<P::Output>> + 'static
where where
P: Payload + Serialize, P: Payload + Serialize,
P::Output: DeserializeOwned, P::Output: DeserializeOwned + 'static,
{ {
let client = self.client.clone(); let client = self.client.clone();
let token = Arc::clone(&self.token); let token = Arc::clone(&self.token);
@ -237,7 +237,7 @@ impl Bot {
) -> impl Future<Output = ResponseResult<P::Output>> ) -> impl Future<Output = ResponseResult<P::Output>>
where where
P: MultipartPayload + Serialize, P: MultipartPayload + Serialize,
P::Output: DeserializeOwned, P::Output: DeserializeOwned + 'static,
{ {
let client = self.client.clone(); let client = self.client.clone();
let token = Arc::clone(&self.token); let token = Arc::clone(&self.token);
@ -267,7 +267,7 @@ impl Bot {
) -> impl Future<Output = ResponseResult<P::Output>> ) -> impl Future<Output = ResponseResult<P::Output>>
where where
P: MultipartPayload + Serialize, P: MultipartPayload + Serialize,
P::Output: DeserializeOwned, P::Output: DeserializeOwned + 'static,
{ {
let client = self.client.clone(); let client = self.client.clone();
let token = Arc::clone(&self.token); let token = Arc::clone(&self.token);

View file

@ -17,6 +17,7 @@ pub enum RequestError {
/// The group has been migrated to a supergroup with the specified /// The group has been migrated to a supergroup with the specified
/// identifier. /// identifier.
#[error("The group has been migrated to a supergroup with ID #{0}")] #[error("The group has been migrated to a supergroup with ID #{0}")]
// FIXME: change to `ChatId` :|
MigrateToChatId(i64), MigrateToChatId(i64),
/// In case of exceeding flood control, the number of seconds left to wait /// In case of exceeding flood control, the number of seconds left to wait

View file

@ -1,4 +1,4 @@
use std::time::Duration; use std::{any::TypeId, time::Duration};
use reqwest::{ use reqwest::{
header::{HeaderValue, CONTENT_TYPE}, header::{HeaderValue, CONTENT_TYPE},
@ -19,7 +19,7 @@ pub async fn request_multipart<T>(
_timeout_hint: Option<Duration>, _timeout_hint: Option<Duration>,
) -> ResponseResult<T> ) -> ResponseResult<T>
where where
T: DeserializeOwned, T: DeserializeOwned + 'static,
{ {
// Workaround for [#460] // Workaround for [#460]
// //
@ -58,7 +58,7 @@ pub async fn request_json<T>(
_timeout_hint: Option<Duration>, _timeout_hint: Option<Duration>,
) -> ResponseResult<T> ) -> ResponseResult<T>
where where
T: DeserializeOwned, T: DeserializeOwned + 'static,
{ {
// Workaround for [#460] // Workaround for [#460]
// //
@ -91,7 +91,7 @@ where
async fn process_response<T>(response: Response) -> ResponseResult<T> async fn process_response<T>(response: Response) -> ResponseResult<T>
where where
T: DeserializeOwned, T: DeserializeOwned + 'static,
{ {
if response.status().is_server_error() { if response.status().is_server_error() {
tokio::time::sleep(DELAY_ON_SERVER_ERROR).await; tokio::time::sleep(DELAY_ON_SERVER_ERROR).await;
@ -99,7 +99,176 @@ where
let text = response.text().await?; let text = response.text().await?;
deserialize_response(text)
}
fn deserialize_response<T>(text: String) -> Result<T, RequestError>
where
T: DeserializeOwned + 'static,
{
serde_json::from_str::<TelegramResponse<T>>(&text) serde_json::from_str::<TelegramResponse<T>>(&text)
.map(|mut response| {
use crate::types::{Update, UpdateKind};
use std::{any::Any, iter::zip};
// HACK: Fill-in error information into `UpdateKind::Error`.
//
// Why? Well, we need `Update` deserialization to be reliable,
// even if Telegram breaks something in their Bot API, we want
// 1. Deserialization to """succeed"""
// 2. Get the `update.id`
//
// Both of these points are required for `get_updates(...) -> Vec<Update>`
// to behave well after Telegram introduces updates that we can't parse.
// (1.) makes it so only some of the updates in a butch need to be skipped
// (otherwise serde'll stop on the first error). (2.) allows us to issue
// the next `get_updates` call with the right offset, even if the last
// update in the batch didn't deserialize well.
//
// serde's interface doesn't allows us to implement `Deserialize` in such
// a way, that we could keep the data we couldn't parse, so our
// `Deserialize` impl for `UpdateKind` just returns
// `UpdateKind::Error(/* some empty-ish value */)`. Here, through some
// terrible hacks and downcasting, we fill-in the data we couldn't parse
// so that our users can make actionable bug reports.
//
// We specifically handle `Vec<Update>` here, because that's the return
// type of the only method that returns updates.
if TypeId::of::<T>() == TypeId::of::<Vec<Update>>() {
if let TelegramResponse::Ok { response, .. } = &mut response {
if let Some(updates) =
(response as &mut T as &mut dyn Any).downcast_mut::<Vec<Update>>()
{
if updates.iter().any(|u| matches!(u.kind, UpdateKind::Error(_))) {
let re_parsed = serde_json::from_str(&text);
if let Ok(TelegramResponse::Ok { response: values, .. }) = re_parsed {
for (update, value) in zip::<_, Vec<_>>(updates, values) {
if let UpdateKind::Error(dest) = &mut update.kind {
*dest = value;
}
}
}
}
}
}
}
response
})
.map_err(|source| RequestError::InvalidJson { source, raw: text.into() })? .map_err(|source| RequestError::InvalidJson { source, raw: text.into() })?
.into() .into()
} }
#[cfg(test)]
mod tests {
use std::time::Duration;
use cool_asserts::assert_matches;
use crate::{
net::request::deserialize_response,
types::{True, Update, UpdateKind},
ApiError, RequestError,
};
#[test]
fn smoke_ok() {
let json = r#"{"ok":true,"result":true}"#.to_owned();
let res = deserialize_response::<True>(json);
assert_matches!(res, Ok(True));
}
#[test]
fn smoke_err() {
let json =
r#"{"ok":false,"description":"Forbidden: bot was blocked by the user"}"#.to_owned();
let res = deserialize_response::<True>(json);
assert_matches!(res, Err(RequestError::Api(ApiError::BotBlocked)));
}
#[test]
fn migrate() {
let json = r#"{"ok":false,"description":"this string is ignored","parameters":{"migrate_to_chat_id":123456}}"#.to_owned();
let res = deserialize_response::<True>(json);
assert_matches!(res, Err(RequestError::MigrateToChatId(123456)));
}
#[test]
fn retry_after() {
let json = r#"{"ok":false,"description":"this string is ignored","parameters":{"retry_after":123456}}"#.to_owned();
let res = deserialize_response::<True>(json);
assert_matches!(res, Err(RequestError::RetryAfter(duration)) if duration == Duration::from_secs(123456));
}
#[test]
fn update_ok() {
let json = r#"{
"ok":true,
"result":[
{
"update_id":0,
"poll_answer":{
"poll_id":"POLL_ID",
"user": {"id":42,"is_bot":false,"first_name":"blah"},
"option_ids": []
}
}
]
}"#
.to_owned();
let res = deserialize_response::<Vec<Update>>(json).unwrap();
assert_matches!(res, [Update { id: 0, kind: UpdateKind::PollAnswer(_) }]);
}
/// Check that `get_updates` can work with malformed updates.
#[test]
fn update_err() {
let json = r#"{
"ok":true,
"result":[
{
"update_id":0,
"poll_answer":{
"poll_id":"POLL_ID",
"user": {"id":42,"is_bot":false,"first_name":"blah"},
"option_ids": []
}
},
{
"update_id":1,
"something unknown to us":17
},
{
"update_id":2,
"poll_answer":{
"poll_id":"POLL_ID",
"user": {"id":42,"is_bot":false,"first_name":"blah"},
"option_ids": [3, 4, 8]
}
},
{
"update_id":3,
"message":{"some fields are missing":true}
}
]
}"#
.to_owned();
let res = deserialize_response::<Vec<Update>>(json).unwrap();
assert_matches!(
res,
[
Update { id: 0, kind: UpdateKind::PollAnswer(_) },
Update { id: 1, kind: UpdateKind::Error(v) } if v.is_object(),
Update { id: 2, kind: UpdateKind::PollAnswer(_) },
Update { id: 3, kind: UpdateKind::Error(v) } if v.is_object(),
]
);
}
}

View file

@ -156,7 +156,10 @@ pub enum UpdateKind {
/// An error that happened during deserialization. /// An error that happened during deserialization.
/// ///
/// This allows `teloxide` to continue working even if telegram adds a new /// This allows `teloxide` to continue working even if telegram adds a new
/// kind of updates. /// kinds of updates.
///
/// **Note that deserialize implementation always returns an empty value**,
/// teloxide fills in the data when doing deserialization.
Error(Value), Error(Value),
} }
@ -182,94 +185,63 @@ impl<'de> Deserialize<'de> for UpdateKind {
// Try to deserialize a borrowed-str key, or else try deserializing an owned // Try to deserialize a borrowed-str key, or else try deserializing an owned
// string key // string key
let k = map.next_key::<&str>().or_else(|_| { let key = map.next_key::<&str>().or_else(|_| {
map.next_key::<String>().map(|k| { map.next_key::<String>().map(|k| {
tmp = k; tmp = k;
tmp.as_deref() tmp.as_deref()
}) })
}); });
if let Ok(Some(k)) = k { let this = key
let res = match k { .ok()
"message" => { .flatten()
map.next_value::<Message>().map(UpdateKind::Message).map_err(|_| false) .and_then(|key| match key {
"message" => map.next_value::<Message>().ok().map(UpdateKind::Message),
"edited_message" => {
map.next_value::<Message>().ok().map(UpdateKind::EditedMessage)
}
"channel_post" => {
map.next_value::<Message>().ok().map(UpdateKind::ChannelPost)
}
"edited_channel_post" => {
map.next_value::<Message>().ok().map(UpdateKind::EditedChannelPost)
}
"inline_query" => {
map.next_value::<InlineQuery>().ok().map(UpdateKind::InlineQuery)
} }
"edited_message" => map
.next_value::<Message>()
.map(UpdateKind::EditedMessage)
.map_err(|_| false),
"channel_post" => map
.next_value::<Message>()
.map(UpdateKind::ChannelPost)
.map_err(|_| false),
"edited_channel_post" => map
.next_value::<Message>()
.map(UpdateKind::EditedChannelPost)
.map_err(|_| false),
"inline_query" => map
.next_value::<InlineQuery>()
.map(UpdateKind::InlineQuery)
.map_err(|_| false),
"chosen_inline_result" => map "chosen_inline_result" => map
.next_value::<ChosenInlineResult>() .next_value::<ChosenInlineResult>()
.map(UpdateKind::ChosenInlineResult) .ok()
.map_err(|_| false), .map(UpdateKind::ChosenInlineResult),
"callback_query" => map "callback_query" => {
.next_value::<CallbackQuery>() map.next_value::<CallbackQuery>().ok().map(UpdateKind::CallbackQuery)
.map(UpdateKind::CallbackQuery) }
.map_err(|_| false), "shipping_query" => {
"shipping_query" => map map.next_value::<ShippingQuery>().ok().map(UpdateKind::ShippingQuery)
.next_value::<ShippingQuery>() }
.map(UpdateKind::ShippingQuery)
.map_err(|_| false),
"pre_checkout_query" => map "pre_checkout_query" => map
.next_value::<PreCheckoutQuery>() .next_value::<PreCheckoutQuery>()
.map(UpdateKind::PreCheckoutQuery) .ok()
.map_err(|_| false), .map(UpdateKind::PreCheckoutQuery),
"poll" => map.next_value::<Poll>().map(UpdateKind::Poll).map_err(|_| false), "poll" => map.next_value::<Poll>().ok().map(UpdateKind::Poll),
"poll_answer" => map "poll_answer" => {
.next_value::<PollAnswer>() map.next_value::<PollAnswer>().ok().map(UpdateKind::PollAnswer)
.map(UpdateKind::PollAnswer) }
.map_err(|_| false), "my_chat_member" => {
"my_chat_member" => map map.next_value::<ChatMemberUpdated>().ok().map(UpdateKind::MyChatMember)
.next_value::<ChatMemberUpdated>() }
.map(UpdateKind::MyChatMember) "chat_member" => {
.map_err(|_| false), map.next_value::<ChatMemberUpdated>().ok().map(UpdateKind::ChatMember)
"chat_member" => map }
.next_value::<ChatMemberUpdated>()
.map(UpdateKind::ChatMember)
.map_err(|_| false),
"chat_join_request" => map "chat_join_request" => map
.next_value::<ChatJoinRequest>() .next_value::<ChatJoinRequest>()
.map(UpdateKind::ChatJoinRequest) .ok()
.map_err(|_| false), .map(UpdateKind::ChatJoinRequest),
_ => Some(empty_error()),
})
.unwrap_or_else(empty_error);
_ => Err(true), Ok(this)
};
let value_available = match res {
Ok(ok) => return Ok(ok),
Err(e) => e,
};
let mut value = serde_json::Map::new();
value.insert(
k.to_owned(),
if value_available {
map.next_value::<Value>().unwrap_or(Value::Null)
} else {
Value::Null
},
);
while let Ok(Some((k, v))) = map.next_entry::<_, Value>() {
value.insert(k, v);
}
return Ok(UpdateKind::Error(Value::Object(value)));
}
Ok(UpdateKind::Error(Value::Object(<_>::default())))
} }
} }
@ -319,6 +291,10 @@ impl Serialize for UpdateKind {
} }
} }
fn empty_error() -> UpdateKind {
UpdateKind::Error(Value::Object(<_>::default()))
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crate::types::{ use crate::types::{

View file

@ -9,7 +9,7 @@ use tokio::sync::mpsc;
use crate::{ use crate::{
requests::Requester, requests::Requester,
stop::StopFlag, stop::StopFlag,
types::Update, types::{Update, UpdateKind},
update_listeners::{webhooks::Options, UpdateListener}, update_listeners::{webhooks::Options, UpdateListener},
}; };
@ -186,8 +186,14 @@ pub fn axum_no_setup(
Some(tx) => tx, Some(tx) => tx,
}; };
match serde_json::from_str(&input) { match serde_json::from_str::<Update>(&input) {
Ok(update) => { Ok(mut update) => {
// See HACK comment in
// `teloxide_core::net::request::process_response::{closure#0}`
if let UpdateKind::Error(value) = &mut update.kind {
*value = serde_json::from_str(&input).unwrap_or_default();
}
tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook") tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook")
} }
Err(error) => { Err(error) => {