mirror of
https://github.com/teloxide/teloxide.git
synced 2024-12-22 14:35:36 +01:00
Fix the storage persistency bug
This commit is contained in:
parent
afa9c8f8f5
commit
bf1ed601ac
6 changed files with 76 additions and 56 deletions
12
CHANGELOG.md
12
CHANGELOG.md
|
@ -6,6 +6,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||
|
||||
## [unreleased]
|
||||
|
||||
### Added
|
||||
|
||||
- `Storage::get_dialogue`
|
||||
|
||||
### Changed
|
||||
|
||||
- Do not return a dialogue from `Storage::{remove_dialogue, update_dialogue}`.
|
||||
|
||||
### Fixed
|
||||
|
||||
- A storage persistency bug ([issue 304](https://github.com/teloxide/teloxide/issues/304)).
|
||||
|
||||
### Fixed
|
||||
|
||||
- Remove `reqwest` dependency. It's not needed after the [teloxide-core] integration.
|
||||
|
|
|
@ -4,7 +4,7 @@ use crate::dispatching::{
|
|||
},
|
||||
DispatcherHandler, UpdateWithCx,
|
||||
};
|
||||
use std::{convert::Infallible, marker::PhantomData};
|
||||
use std::{convert::Infallible, fmt::Debug, marker::PhantomData};
|
||||
|
||||
use futures::{future::BoxFuture, FutureExt, StreamExt};
|
||||
use tokio::sync::mpsc;
|
||||
|
@ -65,7 +65,7 @@ where
|
|||
Upd: GetChatId + Send + 'static,
|
||||
D: Default + Send + 'static,
|
||||
S: Storage<D> + Send + Sync + 'static,
|
||||
S::Error: Send + 'static,
|
||||
S::Error: Debug + Send + 'static,
|
||||
{
|
||||
/// Creates a dispatcher with the specified `handler` and `storage`.
|
||||
#[must_use]
|
||||
|
@ -97,18 +97,13 @@ where
|
|||
async move {
|
||||
let chat_id = cx.update.chat_id();
|
||||
|
||||
let dialogue = Arc::clone(&storage)
|
||||
.remove_dialogue(chat_id)
|
||||
.await
|
||||
.map(Option::unwrap_or_default);
|
||||
let dialogue =
|
||||
Arc::clone(&storage).get_dialogue(chat_id).await.map(Option::unwrap_or_default);
|
||||
|
||||
match handler.handle(DialogueWithCx { cx, dialogue }).await {
|
||||
DialogueStage::Next(new_dialogue) => {
|
||||
if let Ok(Some(_)) = storage.update_dialogue(chat_id, new_dialogue).await {
|
||||
panic!(
|
||||
"Oops, you have an bug in your Storage: update_dialogue returns \
|
||||
Some after remove_dialogue"
|
||||
);
|
||||
if let Err(e) = storage.update_dialogue(chat_id, new_dialogue).await {
|
||||
log::error!("Storage::update_dialogue failed: {:?}", e);
|
||||
}
|
||||
}
|
||||
DialogueStage::Exit => {
|
||||
|
@ -117,8 +112,9 @@ where
|
|||
// sender right here:
|
||||
senders.remove(&chat_id);
|
||||
|
||||
// We already removed a dialogue from `storage` (see
|
||||
// the beginning of this async block).
|
||||
if let Err(e) = storage.remove_dialogue(chat_id).await {
|
||||
log::error!("Storage::remove_dialogue failed: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -134,7 +130,7 @@ where
|
|||
Upd: GetChatId + Send + 'static,
|
||||
D: Default + Send + 'static,
|
||||
S: Storage<D> + Send + Sync + 'static,
|
||||
S::Error: Send + 'static,
|
||||
S::Error: Debug + Send + 'static,
|
||||
R: Requester + Send,
|
||||
{
|
||||
fn handle(
|
||||
|
|
|
@ -25,27 +25,41 @@ impl<S> InMemStorage<S> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<D> Storage<D> for InMemStorage<D> {
|
||||
impl<D> Storage<D> for InMemStorage<D>
|
||||
where
|
||||
D: ToOwned<Owned = D>,
|
||||
D: Send + 'static,
|
||||
{
|
||||
type Error = std::convert::Infallible;
|
||||
|
||||
fn remove_dialogue(
|
||||
self: Arc<Self>,
|
||||
chat_id: i64,
|
||||
) -> BoxFuture<'static, Result<Option<D>, Self::Error>>
|
||||
fn remove_dialogue(self: Arc<Self>, chat_id: i64) -> BoxFuture<'static, Result<(), Self::Error>>
|
||||
where
|
||||
D: Send + 'static,
|
||||
{
|
||||
Box::pin(async move { Ok(self.map.lock().await.remove(&chat_id)) })
|
||||
Box::pin(async move {
|
||||
self.map.lock().await.remove(&chat_id);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn update_dialogue(
|
||||
self: Arc<Self>,
|
||||
chat_id: i64,
|
||||
dialogue: D,
|
||||
) -> BoxFuture<'static, Result<Option<D>, Self::Error>>
|
||||
) -> BoxFuture<'static, Result<(), Self::Error>>
|
||||
where
|
||||
D: Send + 'static,
|
||||
{
|
||||
Box::pin(async move { Ok(self.map.lock().await.insert(chat_id, dialogue)) })
|
||||
Box::pin(async move {
|
||||
self.map.lock().await.insert(chat_id, dialogue);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn get_dialogue(
|
||||
self: Arc<Self>,
|
||||
chat_id: i64,
|
||||
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
|
||||
Box::pin(async move { Ok(self.map.lock().await.get(&chat_id).map(ToOwned::to_owned)) })
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,26 +42,26 @@ pub use sqlite_storage::{SqliteStorage, SqliteStorageError};
|
|||
pub trait Storage<D> {
|
||||
type Error;
|
||||
|
||||
/// Removes a dialogue with the specified `chat_id`.
|
||||
///
|
||||
/// Returns `None` if there wasn't such a dialogue, `Some(dialogue)` if a
|
||||
/// `dialogue` was deleted.
|
||||
/// Removes a dialogue indexed by `chat_id`.
|
||||
fn remove_dialogue(
|
||||
self: Arc<Self>,
|
||||
chat_id: i64,
|
||||
) -> BoxFuture<'static, Result<Option<D>, Self::Error>>
|
||||
) -> BoxFuture<'static, Result<(), Self::Error>>
|
||||
where
|
||||
D: Send + 'static;
|
||||
|
||||
/// Updates a dialogue with the specified `chat_id`.
|
||||
///
|
||||
/// Returns `None` if there wasn't such a dialogue, `Some(dialogue)` if a
|
||||
/// `dialogue` was updated.
|
||||
/// Updates a dialogue indexed by `chat_id` with `dialogue`.
|
||||
fn update_dialogue(
|
||||
self: Arc<Self>,
|
||||
chat_id: i64,
|
||||
dialogue: D,
|
||||
) -> BoxFuture<'static, Result<Option<D>, Self::Error>>
|
||||
) -> BoxFuture<'static, Result<(), Self::Error>>
|
||||
where
|
||||
D: Send + 'static;
|
||||
|
||||
/// Provides a dialogue indexed by `chat_id`.
|
||||
fn get_dialogue(
|
||||
self: Arc<Self>,
|
||||
chat_id: i64,
|
||||
) -> BoxFuture<'static, Result<Option<D>, Self::Error>>;
|
||||
}
|
||||
|
|
|
@ -5,14 +5,13 @@ use std::{
|
|||
};
|
||||
|
||||
use futures::future::BoxFuture;
|
||||
use log::{log_enabled, trace, Level::Trace};
|
||||
|
||||
use crate::dispatching::dialogue::Storage;
|
||||
|
||||
/// Storage wrapper for logging purposes
|
||||
/// Storage wrapper for logging purposes.
|
||||
///
|
||||
/// Reports about any dialogue update or removal action on `trace` level
|
||||
/// using `log` crate.
|
||||
/// Reports about any dialogue action using the `trace` level in the `log`
|
||||
/// crate.
|
||||
pub struct TraceStorage<S> {
|
||||
inner: Arc<S>,
|
||||
}
|
||||
|
@ -35,14 +34,11 @@ where
|
|||
{
|
||||
type Error = <S as Storage<D>>::Error;
|
||||
|
||||
fn remove_dialogue(
|
||||
self: Arc<Self>,
|
||||
chat_id: i64,
|
||||
) -> BoxFuture<'static, Result<Option<D>, Self::Error>>
|
||||
fn remove_dialogue(self: Arc<Self>, chat_id: i64) -> BoxFuture<'static, Result<(), Self::Error>>
|
||||
where
|
||||
D: Send + 'static,
|
||||
{
|
||||
trace!("Removing dialogue with {}", chat_id);
|
||||
log::trace!("Removing dialogue #{}", chat_id);
|
||||
<S as Storage<D>>::remove_dialogue(self.inner.clone(), chat_id)
|
||||
}
|
||||
|
||||
|
@ -50,21 +46,23 @@ where
|
|||
self: Arc<Self>,
|
||||
chat_id: i64,
|
||||
dialogue: D,
|
||||
) -> BoxFuture<'static, Result<Option<D>, Self::Error>>
|
||||
) -> BoxFuture<'static, Result<(), Self::Error>>
|
||||
where
|
||||
D: Send + 'static,
|
||||
{
|
||||
if log_enabled!(Trace) {
|
||||
Box::pin(async move {
|
||||
let to = format!("{:#?}", dialogue);
|
||||
let from =
|
||||
<S as Storage<D>>::update_dialogue(self.inner.clone(), chat_id, dialogue)
|
||||
.await?;
|
||||
trace!("Updated dialogue with {}, {:#?} -> {}", chat_id, from, to);
|
||||
Ok(from)
|
||||
})
|
||||
} else {
|
||||
<S as Storage<D>>::update_dialogue(self.inner.clone(), chat_id, dialogue)
|
||||
}
|
||||
Box::pin(async move {
|
||||
let to = format!("{:#?}", dialogue);
|
||||
<S as Storage<D>>::update_dialogue(self.inner.clone(), chat_id, dialogue).await?;
|
||||
log::trace!("Updated a dialogue #{}: {:#?}", chat_id, to);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn get_dialogue(
|
||||
self: Arc<Self>,
|
||||
chat_id: i64,
|
||||
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
|
||||
log::trace!("Requested a dialogue #{}", chat_id);
|
||||
<S as Storage<D>>::get_dialogue(self.inner.clone(), chat_id)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ use teloxide_core::{requests::Requester, types::Message};
|
|||
pub async fn dialogues_repl<'a, R, H, D, Fut>(requester: R, handler: H)
|
||||
where
|
||||
H: Fn(UpdateWithCx<R, Message>, D) -> Fut + Send + Sync + 'static,
|
||||
D: Default + Send + 'static,
|
||||
D: ToOwned<Owned = D> + Default + Send + 'static,
|
||||
Fut: Future<Output = DialogueStage<D>> + Send + 'static,
|
||||
R: Requester + Send + Clone + 'static,
|
||||
<R as Requester>::GetUpdatesFaultTolerant: Send,
|
||||
|
@ -61,7 +61,7 @@ pub async fn dialogues_repl_with_listener<'a, R, H, D, Fut, L, ListenerE>(
|
|||
listener: L,
|
||||
) where
|
||||
H: Fn(UpdateWithCx<R, Message>, D) -> Fut + Send + Sync + 'static,
|
||||
D: Default + Send + 'static,
|
||||
D: ToOwned<Owned = D> + Default + Send + 'static,
|
||||
Fut: Future<Output = DialogueStage<D>> + Send + 'static,
|
||||
L: UpdateListener<ListenerE> + Send + 'a,
|
||||
ListenerE: Debug + Send + 'a,
|
||||
|
|
Loading…
Reference in a new issue