mirror of
https://github.com/teloxide/teloxide.git
synced 2025-01-08 19:33:53 +01:00
Remove rocksdb storage
This commit is contained in:
parent
0df19e5fc9
commit
ba8c316a4a
6 changed files with 12 additions and 224 deletions
|
@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||
|
||||
## unreleased
|
||||
|
||||
## Removed
|
||||
|
||||
- `rocksdb-storage` feature and associated items [**BC**]
|
||||
|
||||
## 0.11.1 - 2022-10-31
|
||||
|
||||
### Added
|
||||
|
|
|
@ -1,6 +1,14 @@
|
|||
This document describes breaking changes of `teloxide` crate, as well as the ways to update code.
|
||||
Note that the list of required changes is not fully exhaustive and it may lack something in rare cases.
|
||||
|
||||
## 0.11 -> 0.??
|
||||
|
||||
### teloxide
|
||||
|
||||
`rocksdb-storage` feature and associated items were removed.
|
||||
If you are using rocksdb storage, you'll need to either write `Storage` impl yourself, or use a third party crate.
|
||||
<!-- FIXME: add a link once there *is* a third party crate -->
|
||||
|
||||
## 0.11 -> 0.11.1
|
||||
|
||||
### teloxide
|
||||
|
|
|
@ -19,7 +19,6 @@ webhooks-axum = ["webhooks", "axum", "tower", "tower-http"]
|
|||
|
||||
sqlite-storage = ["sqlx"]
|
||||
redis-storage = ["redis"]
|
||||
rocksdb-storage = ["rocksdb"]
|
||||
cbor-serializer = ["serde_cbor"]
|
||||
bincode-serializer = ["bincode"]
|
||||
|
||||
|
@ -43,7 +42,6 @@ full = [
|
|||
"webhooks-axum",
|
||||
"sqlite-storage",
|
||||
"redis-storage",
|
||||
"rocksdb-storage",
|
||||
"cbor-serializer",
|
||||
"bincode-serializer",
|
||||
"macros",
|
||||
|
@ -94,9 +92,6 @@ sqlx = { version = "0.6", optional = true, default-features = false, features =
|
|||
"sqlite",
|
||||
] }
|
||||
redis = { version = "0.21", features = ["tokio-comp"], optional = true }
|
||||
rocksdb = { version = "0.19", optional = true, default-features = false, features = [
|
||||
"lz4",
|
||||
] }
|
||||
serde_cbor = { version = "0.11", optional = true }
|
||||
bincode = { version = "1.3", optional = true }
|
||||
axum = { version = "0.5.13", optional = true }
|
||||
|
@ -131,11 +126,6 @@ name = "sqlite"
|
|||
path = "tests/sqlite.rs"
|
||||
required-features = ["sqlite-storage", "cbor-serializer", "bincode-serializer"]
|
||||
|
||||
[[test]]
|
||||
name = "rocksdb"
|
||||
path = "tests/rocksdb.rs"
|
||||
required-features = ["rocksdb-storage", "cbor-serializer", "bincode-serializer"]
|
||||
|
||||
[[example]]
|
||||
name = "dialogue"
|
||||
required-features = ["macros"]
|
||||
|
|
|
@ -9,9 +9,6 @@ mod redis_storage;
|
|||
#[cfg(feature = "sqlite-storage")]
|
||||
mod sqlite_storage;
|
||||
|
||||
#[cfg(feature = "rocksdb-storage")]
|
||||
mod rocksdb_storage;
|
||||
|
||||
use futures::future::BoxFuture;
|
||||
use teloxide_core::types::ChatId;
|
||||
|
||||
|
@ -28,9 +25,6 @@ use std::sync::Arc;
|
|||
#[cfg(feature = "sqlite-storage")]
|
||||
pub use sqlite_storage::{SqliteStorage, SqliteStorageError};
|
||||
|
||||
#[cfg(feature = "rocksdb-storage")]
|
||||
pub use rocksdb_storage::{RocksDbStorage, RocksDbStorageError};
|
||||
|
||||
/// A storage with an erased error type.
|
||||
pub type ErasedStorage<D> =
|
||||
dyn Storage<D, Error = Box<dyn std::error::Error + Send + Sync>> + Send + Sync;
|
||||
|
|
|
@ -1,113 +0,0 @@
|
|||
use super::{serializer::Serializer, Storage};
|
||||
use futures::future::BoxFuture;
|
||||
use rocksdb::{DBCompressionType, DBWithThreadMode, MultiThreaded};
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
fmt::{Debug, Display},
|
||||
str,
|
||||
sync::Arc,
|
||||
};
|
||||
use teloxide_core::types::ChatId;
|
||||
use thiserror::Error;
|
||||
|
||||
/// A persistent dialogue storage based on [RocksDb](http://rocksdb.org/).
|
||||
pub struct RocksDbStorage<S> {
|
||||
db: DBWithThreadMode<MultiThreaded>,
|
||||
serializer: S,
|
||||
}
|
||||
|
||||
/// An error returned from [`RocksDbStorage`].
|
||||
#[derive(Debug, Error)]
|
||||
pub enum RocksDbStorageError<SE>
|
||||
where
|
||||
SE: Debug + Display,
|
||||
{
|
||||
#[error("dialogue serialization error: {0}")]
|
||||
SerdeError(SE),
|
||||
|
||||
#[error("RocksDb error: {0}")]
|
||||
RocksDbError(#[from] rocksdb::Error),
|
||||
|
||||
/// Returned from [`RocksDbStorage::remove_dialogue`].
|
||||
#[error("row not found")]
|
||||
DialogueNotFound,
|
||||
}
|
||||
|
||||
impl<S> RocksDbStorage<S> {
|
||||
pub async fn open(
|
||||
path: &str,
|
||||
serializer: S,
|
||||
options: Option<rocksdb::Options>,
|
||||
) -> Result<Arc<Self>, RocksDbStorageError<Infallible>> {
|
||||
let options = match options {
|
||||
Some(opts) => opts,
|
||||
None => {
|
||||
let mut opts = rocksdb::Options::default();
|
||||
opts.set_compression_type(DBCompressionType::Lz4);
|
||||
opts.create_if_missing(true);
|
||||
opts
|
||||
}
|
||||
};
|
||||
|
||||
let db = DBWithThreadMode::<MultiThreaded>::open(&options, path)?;
|
||||
Ok(Arc::new(Self { db, serializer }))
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, D> Storage<D> for RocksDbStorage<S>
|
||||
where
|
||||
S: Send + Sync + Serializer<D> + 'static,
|
||||
D: Send + Serialize + DeserializeOwned + 'static,
|
||||
<S as Serializer<D>>::Error: Debug + Display,
|
||||
{
|
||||
type Error = RocksDbStorageError<<S as Serializer<D>>::Error>;
|
||||
|
||||
/// Returns [`RocksDbStorageError::DialogueNotFound`] if a dialogue does not
|
||||
/// exist.
|
||||
fn remove_dialogue(
|
||||
self: Arc<Self>,
|
||||
ChatId(chat_id): ChatId,
|
||||
) -> BoxFuture<'static, Result<(), Self::Error>> {
|
||||
Box::pin(async move {
|
||||
let key = chat_id.to_le_bytes();
|
||||
|
||||
if self.db.get(key)?.is_none() {
|
||||
return Err(RocksDbStorageError::DialogueNotFound);
|
||||
}
|
||||
|
||||
self.db.delete(key).unwrap();
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn update_dialogue(
|
||||
self: Arc<Self>,
|
||||
ChatId(chat_id): ChatId,
|
||||
dialogue: D,
|
||||
) -> BoxFuture<'static, Result<(), Self::Error>> {
|
||||
Box::pin(async move {
|
||||
let d =
|
||||
self.serializer.serialize(&dialogue).map_err(RocksDbStorageError::SerdeError)?;
|
||||
|
||||
let key = chat_id.to_le_bytes();
|
||||
self.db.put(key, &d)?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn get_dialogue(
|
||||
self: Arc<Self>,
|
||||
ChatId(chat_id): ChatId,
|
||||
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
|
||||
Box::pin(async move {
|
||||
let key = chat_id.to_le_bytes();
|
||||
self.db
|
||||
.get(key)?
|
||||
.map(|d| self.serializer.deserialize(&d).map_err(RocksDbStorageError::SerdeError))
|
||||
.transpose()
|
||||
})
|
||||
}
|
||||
}
|
|
@ -1,95 +0,0 @@
|
|||
use std::{
|
||||
fmt::{Debug, Display},
|
||||
fs,
|
||||
sync::Arc,
|
||||
};
|
||||
use teloxide::{
|
||||
dispatching::dialogue::{RocksDbStorage, RocksDbStorageError, Serializer, Storage},
|
||||
types::ChatId,
|
||||
};
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_rocksdb_json() {
|
||||
fs::remove_dir_all("./test_db1").ok();
|
||||
fs::create_dir("./test_db1").unwrap();
|
||||
let storage = RocksDbStorage::open(
|
||||
"./test_db1/test_db1.rocksdb",
|
||||
teloxide::dispatching::dialogue::serializer::Json,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
test_rocksdb(storage).await;
|
||||
fs::remove_dir_all("./test_db1").unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_rocksdb_bincode() {
|
||||
fs::remove_dir_all("./test_db2").ok();
|
||||
fs::create_dir("./test_db2").unwrap();
|
||||
let storage = RocksDbStorage::open(
|
||||
"./test_db2/test_db2.rocksdb",
|
||||
teloxide::dispatching::dialogue::serializer::Bincode,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
test_rocksdb(storage).await;
|
||||
fs::remove_dir_all("./test_db2").unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_rocksdb_cbor() {
|
||||
fs::remove_dir_all("./test_db3").ok();
|
||||
fs::create_dir("./test_db3").unwrap();
|
||||
let storage = RocksDbStorage::open(
|
||||
"./test_db3/test_db3.rocksdb",
|
||||
teloxide::dispatching::dialogue::serializer::Cbor,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
test_rocksdb(storage).await;
|
||||
fs::remove_dir_all("./test_db3").unwrap();
|
||||
}
|
||||
|
||||
type Dialogue = String;
|
||||
|
||||
macro_rules! test_dialogues {
|
||||
($storage:expr, $_0:expr, $_1:expr, $_2:expr) => {
|
||||
assert_eq!(Arc::clone(&$storage).get_dialogue(ChatId(1)).await.unwrap(), $_0);
|
||||
assert_eq!(Arc::clone(&$storage).get_dialogue(ChatId(11)).await.unwrap(), $_1);
|
||||
assert_eq!(Arc::clone(&$storage).get_dialogue(ChatId(256)).await.unwrap(), $_2);
|
||||
};
|
||||
}
|
||||
|
||||
async fn test_rocksdb<S>(storage: Arc<RocksDbStorage<S>>)
|
||||
where
|
||||
S: Send + Sync + Serializer<Dialogue> + 'static,
|
||||
<S as Serializer<Dialogue>>::Error: Debug + Display,
|
||||
{
|
||||
test_dialogues!(storage, None, None, None);
|
||||
|
||||
Arc::clone(&storage).update_dialogue(ChatId(1), "ABC".to_owned()).await.unwrap();
|
||||
Arc::clone(&storage).update_dialogue(ChatId(11), "DEF".to_owned()).await.unwrap();
|
||||
Arc::clone(&storage).update_dialogue(ChatId(256), "GHI".to_owned()).await.unwrap();
|
||||
|
||||
test_dialogues!(
|
||||
storage,
|
||||
Some("ABC".to_owned()),
|
||||
Some("DEF".to_owned()),
|
||||
Some("GHI".to_owned())
|
||||
);
|
||||
|
||||
Arc::clone(&storage).remove_dialogue(ChatId(1)).await.unwrap();
|
||||
Arc::clone(&storage).remove_dialogue(ChatId(11)).await.unwrap();
|
||||
Arc::clone(&storage).remove_dialogue(ChatId(256)).await.unwrap();
|
||||
|
||||
test_dialogues!(storage, None, None, None);
|
||||
|
||||
// Check that a try to remove a non-existing dialogue results in an error.
|
||||
assert!(matches!(
|
||||
Arc::clone(&storage).remove_dialogue(ChatId(1)).await.unwrap_err(),
|
||||
RocksDbStorageError::DialogueNotFound
|
||||
));
|
||||
}
|
Loading…
Reference in a new issue