Merge pull request #356 from teloxide/fix-storage-persistency

Fix the storage persistency bug
This commit is contained in:
Waffle Lapkin 2021-03-29 09:33:56 +03:00 committed by GitHub
commit b150af772a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 173 additions and 129 deletions

View file

@ -6,13 +6,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [unreleased] ## [unreleased]
### Added
- `Storage::get_dialogue`
### Changed ### Changed
- Do not return a dialogue from `Storage::{remove_dialogue, update_dialogue}`.
- Require `D: Clone` in `dialogues_repl(_with_listener)` and `InMemStorage`.
- Automatically delete a webhook if it was set up in `update_listeners::polling_default` (thereby making it `async`, [issue 319](https://github.com/teloxide/teloxide/issues/319)). - Automatically delete a webhook if it was set up in `update_listeners::polling_default` (thereby making it `async`, [issue 319](https://github.com/teloxide/teloxide/issues/319)).
### Fixed ### Fixed
- Remove `reqwest` dependency. It's not needed after the [teloxide-core] integration. - Remove the `reqwest` dependency. It's not needed after the [teloxide-core] integration.
- A storage persistency bug ([issue 304](https://github.com/teloxide/teloxide/issues/304)).
- Log errors from `Storage::{remove_dialogue, update_dialogue}` in `DialogueDispatcher` ([issue 302](https://github.com/teloxide/teloxide/issues/302)).
## [0.4.0] - 2021-03-22 ## [0.4.0] - 2021-03-22

View file

@ -6,7 +6,7 @@ use crate::dialogue::states::{
use derive_more::From; use derive_more::From;
use teloxide::macros::Transition; use teloxide::macros::Transition;
#[derive(Transition, From)] #[derive(Transition, Clone, From)]
pub enum Dialogue { pub enum Dialogue {
Start(StartState), Start(StartState),
ReceiveFullName(ReceiveFullNameState), ReceiveFullName(ReceiveFullNameState),

View file

@ -1,7 +1,7 @@
use crate::dialogue::{states::receive_location::ReceiveLocationState, Dialogue}; use crate::dialogue::{states::receive_location::ReceiveLocationState, Dialogue};
use teloxide::prelude::*; use teloxide::prelude::*;
#[derive(Generic)] #[derive(Clone, Generic)]
pub struct ReceiveAgeState { pub struct ReceiveAgeState {
pub full_name: String, pub full_name: String,
} }

View file

@ -1,7 +1,7 @@
use crate::dialogue::{states::receive_age::ReceiveAgeState, Dialogue}; use crate::dialogue::{states::receive_age::ReceiveAgeState, Dialogue};
use teloxide::prelude::*; use teloxide::prelude::*;
#[derive(Generic)] #[derive(Clone, Generic)]
pub struct ReceiveFullNameState; pub struct ReceiveFullNameState;
#[teloxide(subtransition)] #[teloxide(subtransition)]

View file

@ -1,7 +1,7 @@
use crate::dialogue::Dialogue; use crate::dialogue::Dialogue;
use teloxide::prelude::*; use teloxide::prelude::*;
#[derive(Generic)] #[derive(Clone, Generic)]
pub struct ReceiveLocationState { pub struct ReceiveLocationState {
pub full_name: String, pub full_name: String,
pub age: u8, pub age: u8,

View file

@ -1,6 +1,7 @@
use crate::dialogue::{states::ReceiveFullNameState, Dialogue}; use crate::dialogue::{states::ReceiveFullNameState, Dialogue};
use teloxide::prelude::*; use teloxide::prelude::*;
#[derive(Clone)]
pub struct StartState; pub struct StartState;
#[teloxide(subtransition)] #[teloxide(subtransition)]

View file

@ -4,7 +4,7 @@ use crate::dispatching::{
}, },
DispatcherHandler, UpdateWithCx, DispatcherHandler, UpdateWithCx,
}; };
use std::{convert::Infallible, marker::PhantomData}; use std::{convert::Infallible, fmt::Debug, marker::PhantomData};
use futures::{future::BoxFuture, FutureExt, StreamExt}; use futures::{future::BoxFuture, FutureExt, StreamExt};
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -19,6 +19,11 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
/// Note that it implements [`DispatcherHandler`], so you can just put an /// Note that it implements [`DispatcherHandler`], so you can just put an
/// instance of this dispatcher into the [`Dispatcher`]'s methods. /// instance of this dispatcher into the [`Dispatcher`]'s methods.
/// ///
/// Note that when the storage methods [`Storage::remove_dialogue`] and
/// [`Storage::update_dialogue`] are failed, the errors are logged, but a result
/// from [`Storage::get_dialogue`] is provided to a user handler as-is so you
/// can respond to a concrete user with an error description.
///
/// See the [module-level documentation](crate::dispatching::dialogue) for the /// See the [module-level documentation](crate::dispatching::dialogue) for the
/// design overview. /// design overview.
/// ///
@ -65,7 +70,7 @@ where
Upd: GetChatId + Send + 'static, Upd: GetChatId + Send + 'static,
D: Default + Send + 'static, D: Default + Send + 'static,
S: Storage<D> + Send + Sync + 'static, S: Storage<D> + Send + Sync + 'static,
S::Error: Send + 'static, S::Error: Debug + Send + 'static,
{ {
/// Creates a dispatcher with the specified `handler` and `storage`. /// Creates a dispatcher with the specified `handler` and `storage`.
#[must_use] #[must_use]
@ -97,18 +102,13 @@ where
async move { async move {
let chat_id = cx.update.chat_id(); let chat_id = cx.update.chat_id();
let dialogue = Arc::clone(&storage) let dialogue =
.remove_dialogue(chat_id) Arc::clone(&storage).get_dialogue(chat_id).await.map(Option::unwrap_or_default);
.await
.map(Option::unwrap_or_default);
match handler.handle(DialogueWithCx { cx, dialogue }).await { match handler.handle(DialogueWithCx { cx, dialogue }).await {
DialogueStage::Next(new_dialogue) => { DialogueStage::Next(new_dialogue) => {
if let Ok(Some(_)) = storage.update_dialogue(chat_id, new_dialogue).await { if let Err(e) = storage.update_dialogue(chat_id, new_dialogue).await {
panic!( log::error!("Storage::update_dialogue failed: {:?}", e);
"Oops, you have an bug in your Storage: update_dialogue returns \
Some after remove_dialogue"
);
} }
} }
DialogueStage::Exit => { DialogueStage::Exit => {
@ -117,8 +117,9 @@ where
// sender right here: // sender right here:
senders.remove(&chat_id); senders.remove(&chat_id);
// We already removed a dialogue from `storage` (see if let Err(e) = storage.remove_dialogue(chat_id).await {
// the beginning of this async block). log::error!("Storage::remove_dialogue failed: {:?}", e);
}
} }
} }
} }
@ -134,7 +135,7 @@ where
Upd: GetChatId + Send + 'static, Upd: GetChatId + Send + 'static,
D: Default + Send + 'static, D: Default + Send + 'static,
S: Storage<D> + Send + Sync + 'static, S: Storage<D> + Send + Sync + 'static,
S::Error: Send + 'static, S::Error: Debug + Send + 'static,
R: Requester + Send, R: Requester + Send,
{ {
fn handle( fn handle(

View file

@ -35,8 +35,11 @@
//! //!
//! use teloxide::{dispatching::dialogue::Transition, prelude::*, teloxide, RequestError}; //! use teloxide::{dispatching::dialogue::Transition, prelude::*, teloxide, RequestError};
//! //!
//! #[derive(Clone)]
//! struct _1State; //! struct _1State;
//! #[derive(Clone)]
//! struct _2State; //! struct _2State;
//! #[derive(Clone)]
//! struct _3State; //! struct _3State;
//! //!
//! type Out = TransitionOut<D, RequestError>; //! type Out = TransitionOut<D, RequestError>;
@ -56,7 +59,7 @@
//! todo!() //! todo!()
//! } //! }
//! //!
//! #[derive(Transition)] //! #[derive(Clone, Transition)]
//! enum D { //! enum D {
//! _1(_1State), //! _1(_1State),
//! _2(_2State), //! _2(_2State),

View file

@ -25,27 +25,41 @@ impl<S> InMemStorage<S> {
} }
} }
impl<D> Storage<D> for InMemStorage<D> { impl<D> Storage<D> for InMemStorage<D>
where
D: Clone,
D: Send + 'static,
{
type Error = std::convert::Infallible; type Error = std::convert::Infallible;
fn remove_dialogue( fn remove_dialogue(self: Arc<Self>, chat_id: i64) -> BoxFuture<'static, Result<(), Self::Error>>
self: Arc<Self>,
chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>>
where where
D: Send + 'static, D: Send + 'static,
{ {
Box::pin(async move { Ok(self.map.lock().await.remove(&chat_id)) }) Box::pin(async move {
self.map.lock().await.remove(&chat_id);
Ok(())
})
} }
fn update_dialogue( fn update_dialogue(
self: Arc<Self>, self: Arc<Self>,
chat_id: i64, chat_id: i64,
dialogue: D, dialogue: D,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> ) -> BoxFuture<'static, Result<(), Self::Error>>
where where
D: Send + 'static, D: Send + 'static,
{ {
Box::pin(async move { Ok(self.map.lock().await.insert(chat_id, dialogue)) }) Box::pin(async move {
self.map.lock().await.insert(chat_id, dialogue);
Ok(())
})
}
fn get_dialogue(
self: Arc<Self>,
chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
Box::pin(async move { Ok(self.map.lock().await.get(&chat_id).map(ToOwned::to_owned)) })
} }
} }

View file

@ -42,26 +42,26 @@ pub use sqlite_storage::{SqliteStorage, SqliteStorageError};
pub trait Storage<D> { pub trait Storage<D> {
type Error; type Error;
/// Removes a dialogue with the specified `chat_id`. /// Removes a dialogue indexed by `chat_id`.
///
/// Returns `None` if there wasn't such a dialogue, `Some(dialogue)` if a
/// `dialogue` was deleted.
fn remove_dialogue( fn remove_dialogue(
self: Arc<Self>, self: Arc<Self>,
chat_id: i64, chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> ) -> BoxFuture<'static, Result<(), Self::Error>>
where where
D: Send + 'static; D: Send + 'static;
/// Updates a dialogue with the specified `chat_id`. /// Updates a dialogue indexed by `chat_id` with `dialogue`.
///
/// Returns `None` if there wasn't such a dialogue, `Some(dialogue)` if a
/// `dialogue` was updated.
fn update_dialogue( fn update_dialogue(
self: Arc<Self>, self: Arc<Self>,
chat_id: i64, chat_id: i64,
dialogue: D, dialogue: D,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> ) -> BoxFuture<'static, Result<(), Self::Error>>
where where
D: Send + 'static; D: Send + 'static;
/// Provides a dialogue indexed by `chat_id`.
fn get_dialogue(
self: Arc<Self>,
chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>>;
} }

View file

@ -56,7 +56,7 @@ where
fn remove_dialogue( fn remove_dialogue(
self: Arc<Self>, self: Arc<Self>,
chat_id: i64, chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> { ) -> BoxFuture<'static, Result<(), Self::Error>> {
Box::pin(async move { Box::pin(async move {
let res = redis::pipe() let res = redis::pipe()
.atomic() .atomic()
@ -70,13 +70,14 @@ where
// bulk, so all other branches should be unreachable // bulk, so all other branches should be unreachable
match res { match res {
redis::Value::Bulk(bulk) if bulk.len() == 1 => { redis::Value::Bulk(bulk) if bulk.len() == 1 => {
Ok(Option::<Vec<u8>>::from_redis_value(&bulk[0])? Option::<Vec<u8>>::from_redis_value(&bulk[0])?
.map(|v| { .map(|v| {
self.serializer self.serializer
.deserialize(&v) .deserialize(&v)
.map_err(RedisStorageError::SerdeError) .map_err(RedisStorageError::SerdeError)
}) })
.transpose()?) .transpose()?;
Ok(())
} }
_ => unreachable!(), _ => unreachable!(),
} }
@ -87,14 +88,24 @@ where
self: Arc<Self>, self: Arc<Self>,
chat_id: i64, chat_id: i64,
dialogue: D, dialogue: D,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> { ) -> BoxFuture<'static, Result<(), Self::Error>> {
Box::pin(async move { Box::pin(async move {
let dialogue = let dialogue =
self.serializer.serialize(&dialogue).map_err(RedisStorageError::SerdeError)?; self.serializer.serialize(&dialogue).map_err(RedisStorageError::SerdeError)?;
self.conn.lock().await.set::<_, Vec<u8>, _>(chat_id, dialogue).await?;
Ok(())
})
}
fn get_dialogue(
self: Arc<Self>,
chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
Box::pin(async move {
self.conn self.conn
.lock() .lock()
.await .await
.getset::<_, Vec<u8>, Option<Vec<u8>>>(chat_id, dialogue) .get::<_, Option<Vec<u8>>>(chat_id)
.await? .await?
.map(|d| self.serializer.deserialize(&d).map_err(RedisStorageError::SerdeError)) .map(|d| self.serializer.deserialize(&d).map_err(RedisStorageError::SerdeError))
.transpose() .transpose()

View file

@ -63,20 +63,16 @@ where
fn remove_dialogue( fn remove_dialogue(
self: Arc<Self>, self: Arc<Self>,
chat_id: i64, chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> { ) -> BoxFuture<'static, Result<(), Self::Error>> {
Box::pin(async move { Box::pin(async move {
Ok(match get_dialogue(&self.pool, chat_id).await? { if get_dialogue(&self.pool, chat_id).await?.is_some() {
Some(d) => { sqlx::query("DELETE FROM teloxide_dialogues WHERE chat_id = ?")
let prev_dialogue = .bind(chat_id)
self.serializer.deserialize(&d).map_err(SqliteStorageError::SerdeError)?; .execute(&self.pool)
sqlx::query("DELETE FROM teloxide_dialogues WHERE chat_id = ?") .await?;
.bind(chat_id) }
.execute(&self.pool)
.await?; Ok(())
Some(prev_dialogue)
}
_ => None,
})
}) })
} }
@ -84,14 +80,10 @@ where
self: Arc<Self>, self: Arc<Self>,
chat_id: i64, chat_id: i64,
dialogue: D, dialogue: D,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> { ) -> BoxFuture<'static, Result<(), Self::Error>> {
Box::pin(async move { Box::pin(async move {
let prev_dialogue = get_dialogue(&self.pool, chat_id) let d = self.serializer.serialize(&dialogue).map_err(SqliteStorageError::SerdeError)?;
.await?
.map(|d| self.serializer.deserialize(&d).map_err(SqliteStorageError::SerdeError))
.transpose()?;
let upd_dialogue =
self.serializer.serialize(&dialogue).map_err(SqliteStorageError::SerdeError)?;
self.pool self.pool
.acquire() .acquire()
.await? .await?
@ -103,10 +95,22 @@ where
"#, "#,
) )
.bind(chat_id) .bind(chat_id)
.bind(upd_dialogue), .bind(d),
) )
.await?; .await?;
Ok(prev_dialogue) Ok(())
})
}
fn get_dialogue(
self: Arc<Self>,
chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
Box::pin(async move {
get_dialogue(&self.pool, chat_id)
.await?
.map(|d| self.serializer.deserialize(&d).map_err(SqliteStorageError::SerdeError))
.transpose()
}) })
} }
} }

View file

@ -5,14 +5,13 @@ use std::{
}; };
use futures::future::BoxFuture; use futures::future::BoxFuture;
use log::{log_enabled, trace, Level::Trace};
use crate::dispatching::dialogue::Storage; use crate::dispatching::dialogue::Storage;
/// Storage wrapper for logging purposes /// Storage wrapper for logging purposes.
/// ///
/// Reports about any dialogue update or removal action on `trace` level /// Reports about any dialogue action using the `trace` level in the `log`
/// using `log` crate. /// crate.
pub struct TraceStorage<S> { pub struct TraceStorage<S> {
inner: Arc<S>, inner: Arc<S>,
} }
@ -35,14 +34,11 @@ where
{ {
type Error = <S as Storage<D>>::Error; type Error = <S as Storage<D>>::Error;
fn remove_dialogue( fn remove_dialogue(self: Arc<Self>, chat_id: i64) -> BoxFuture<'static, Result<(), Self::Error>>
self: Arc<Self>,
chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>>
where where
D: Send + 'static, D: Send + 'static,
{ {
trace!("Removing dialogue with {}", chat_id); log::trace!("Removing dialogue #{}", chat_id);
<S as Storage<D>>::remove_dialogue(self.inner.clone(), chat_id) <S as Storage<D>>::remove_dialogue(self.inner.clone(), chat_id)
} }
@ -50,21 +46,23 @@ where
self: Arc<Self>, self: Arc<Self>,
chat_id: i64, chat_id: i64,
dialogue: D, dialogue: D,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> ) -> BoxFuture<'static, Result<(), Self::Error>>
where where
D: Send + 'static, D: Send + 'static,
{ {
if log_enabled!(Trace) { Box::pin(async move {
Box::pin(async move { let to = format!("{:#?}", dialogue);
let to = format!("{:#?}", dialogue); <S as Storage<D>>::update_dialogue(self.inner.clone(), chat_id, dialogue).await?;
let from = log::trace!("Updated a dialogue #{}: {:#?}", chat_id, to);
<S as Storage<D>>::update_dialogue(self.inner.clone(), chat_id, dialogue) Ok(())
.await?; })
trace!("Updated dialogue with {}, {:#?} -> {}", chat_id, from, to); }
Ok(from)
}) fn get_dialogue(
} else { self: Arc<Self>,
<S as Storage<D>>::update_dialogue(self.inner.clone(), chat_id, dialogue) chat_id: i64,
} ) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
log::trace!("Requested a dialogue #{}", chat_id);
<S as Storage<D>>::get_dialogue(self.inner.clone(), chat_id)
} }
} }

View file

@ -26,7 +26,7 @@ use teloxide_core::{requests::Requester, types::Message};
pub async fn dialogues_repl<'a, R, H, D, Fut>(requester: R, handler: H) pub async fn dialogues_repl<'a, R, H, D, Fut>(requester: R, handler: H)
where where
H: Fn(UpdateWithCx<R, Message>, D) -> Fut + Send + Sync + 'static, H: Fn(UpdateWithCx<R, Message>, D) -> Fut + Send + Sync + 'static,
D: Default + Send + 'static, D: Clone + Default + Send + 'static,
Fut: Future<Output = DialogueStage<D>> + Send + 'static, Fut: Future<Output = DialogueStage<D>> + Send + 'static,
R: Requester + Send + Clone + 'static, R: Requester + Send + Clone + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send, <R as Requester>::GetUpdatesFaultTolerant: Send,
@ -61,7 +61,7 @@ pub async fn dialogues_repl_with_listener<'a, R, H, D, Fut, L, ListenerE>(
listener: L, listener: L,
) where ) where
H: Fn(UpdateWithCx<R, Message>, D) -> Fut + Send + Sync + 'static, H: Fn(UpdateWithCx<R, Message>, D) -> Fut + Send + Sync + 'static,
D: Default + Send + 'static, D: Clone + Default + Send + 'static,
Fut: Future<Output = DialogueStage<D>> + Send + 'static, Fut: Future<Output = DialogueStage<D>> + Send + 'static,
L: UpdateListener<ListenerE> + Send + 'a, L: UpdateListener<ListenerE> + Send + 'a,
ListenerE: Debug + Send + 'a, ListenerE: Debug + Send + 'a,

View file

@ -1,6 +1,5 @@
use std::{ use std::{
fmt::{Debug, Display}, fmt::{Debug, Display},
future::Future,
sync::Arc, sync::Arc,
}; };
use teloxide::dispatching::dialogue::{RedisStorage, Serializer, Storage}; use teloxide::dispatching::dialogue::{RedisStorage, Serializer, Storage};
@ -40,32 +39,35 @@ async fn test_redis_cbor() {
type Dialogue = String; type Dialogue = String;
macro_rules! test_dialogues {
($storage:expr, $_0:expr, $_1:expr, $_2:expr) => {
assert_eq!(Arc::clone(&$storage).get_dialogue(1).await.unwrap(), $_0);
assert_eq!(Arc::clone(&$storage).get_dialogue(11).await.unwrap(), $_1);
assert_eq!(Arc::clone(&$storage).get_dialogue(256).await.unwrap(), $_2);
};
}
async fn test_redis<S>(storage: Arc<RedisStorage<S>>) async fn test_redis<S>(storage: Arc<RedisStorage<S>>)
where where
S: Send + Sync + Serializer<Dialogue> + 'static, S: Send + Sync + Serializer<Dialogue> + 'static,
<S as Serializer<Dialogue>>::Error: Debug + Display, <S as Serializer<Dialogue>>::Error: Debug + Display,
{ {
check_dialogue(None, Arc::clone(&storage).update_dialogue(1, "ABC".to_owned())).await; test_dialogues!(storage, None, None, None);
check_dialogue(None, Arc::clone(&storage).update_dialogue(11, "DEF".to_owned())).await;
check_dialogue(None, Arc::clone(&storage).update_dialogue(256, "GHI".to_owned())).await;
// 1 - ABC, 11 - DEF, 256 - GHI Arc::clone(&storage).update_dialogue(1, "ABC".to_owned()).await.unwrap();
Arc::clone(&storage).update_dialogue(11, "DEF".to_owned()).await.unwrap();
Arc::clone(&storage).update_dialogue(256, "GHI".to_owned()).await.unwrap();
check_dialogue("ABC", Arc::clone(&storage).update_dialogue(1, "JKL".to_owned())).await; test_dialogues!(
check_dialogue("GHI", Arc::clone(&storage).update_dialogue(256, "MNO".to_owned())).await; storage,
Some("ABC".to_owned()),
Some("DEF".to_owned()),
Some("GHI".to_owned())
);
// 1 - GKL, 11 - DEF, 256 - MNO Arc::clone(&storage).remove_dialogue(1).await.unwrap();
Arc::clone(&storage).remove_dialogue(11).await.unwrap();
Arc::clone(&storage).remove_dialogue(256).await.unwrap();
check_dialogue("JKL", Arc::clone(&storage).remove_dialogue(1)).await; test_dialogues!(storage, None, None, None);
check_dialogue("DEF", Arc::clone(&storage).remove_dialogue(11)).await;
check_dialogue("MNO", Arc::clone(&storage).remove_dialogue(256)).await;
}
async fn check_dialogue<E>(
expected: impl Into<Option<&str>>,
actual: impl Future<Output = Result<Option<Dialogue>, E>>,
) where
E: Debug,
{
assert_eq!(expected.into().map(ToOwned::to_owned), actual.await.unwrap())
} }

View file

@ -1,6 +1,5 @@
use std::{ use std::{
fmt::{Debug, Display}, fmt::{Debug, Display},
future::Future,
sync::Arc, sync::Arc,
}; };
use teloxide::dispatching::dialogue::{Serializer, SqliteStorage, Storage}; use teloxide::dispatching::dialogue::{Serializer, SqliteStorage, Storage};
@ -36,32 +35,35 @@ async fn test_sqlite_cbor() {
type Dialogue = String; type Dialogue = String;
macro_rules! test_dialogues {
($storage:expr, $_0:expr, $_1:expr, $_2:expr) => {
assert_eq!(Arc::clone(&$storage).get_dialogue(1).await.unwrap(), $_0);
assert_eq!(Arc::clone(&$storage).get_dialogue(11).await.unwrap(), $_1);
assert_eq!(Arc::clone(&$storage).get_dialogue(256).await.unwrap(), $_2);
};
}
async fn test_sqlite<S>(storage: Arc<SqliteStorage<S>>) async fn test_sqlite<S>(storage: Arc<SqliteStorage<S>>)
where where
S: Send + Sync + Serializer<Dialogue> + 'static, S: Send + Sync + Serializer<Dialogue> + 'static,
<S as Serializer<Dialogue>>::Error: Debug + Display, <S as Serializer<Dialogue>>::Error: Debug + Display,
{ {
check_dialogue(None, Arc::clone(&storage).update_dialogue(1, "ABC".to_owned())).await; test_dialogues!(storage, None, None, None);
check_dialogue(None, Arc::clone(&storage).update_dialogue(11, "DEF".to_owned())).await;
check_dialogue(None, Arc::clone(&storage).update_dialogue(256, "GHI".to_owned())).await;
// 1 - ABC, 11 - DEF, 256 - GHI Arc::clone(&storage).update_dialogue(1, "ABC".to_owned()).await.unwrap();
Arc::clone(&storage).update_dialogue(11, "DEF".to_owned()).await.unwrap();
Arc::clone(&storage).update_dialogue(256, "GHI".to_owned()).await.unwrap();
check_dialogue("ABC", Arc::clone(&storage).update_dialogue(1, "JKL".to_owned())).await; test_dialogues!(
check_dialogue("GHI", Arc::clone(&storage).update_dialogue(256, "MNO".to_owned())).await; storage,
Some("ABC".to_owned()),
Some("DEF".to_owned()),
Some("GHI".to_owned())
);
// 1 - GKL, 11 - DEF, 256 - MNO Arc::clone(&storage).remove_dialogue(1).await.unwrap();
Arc::clone(&storage).remove_dialogue(11).await.unwrap();
Arc::clone(&storage).remove_dialogue(256).await.unwrap();
check_dialogue("JKL", Arc::clone(&storage).remove_dialogue(1)).await; test_dialogues!(storage, None, None, None);
check_dialogue("DEF", Arc::clone(&storage).remove_dialogue(11)).await;
check_dialogue("MNO", Arc::clone(&storage).remove_dialogue(256)).await;
}
async fn check_dialogue<E>(
expected: impl Into<Option<&str>>,
actual: impl Future<Output = Result<Option<Dialogue>, E>>,
) where
E: Debug,
{
assert_eq!(expected.into().map(ToOwned::to_owned), actual.await.unwrap())
} }