mirror of
https://github.com/teloxide/teloxide.git
synced 2024-12-23 06:51:01 +01:00
Merge pull request #753 from xamgore/rocksdb-storage
Extend dialogue storages with RocksDB
This commit is contained in:
commit
926b8ef786
6 changed files with 226 additions and 3 deletions
|
@ -8,11 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|
||||||
- `teloxide::dispatching::repls::CommandReplExt`, `teloxide::prelude::CommandReplExt` ([issue #740](https://github.com/teloxide/teloxide/issues/740))
|
- The `rocksdb-storage` feature -- enables the RocksDB support ([PR #753](https://github.com/teloxide/teloxide/pull/753))
|
||||||
|
- `teloxide::dispatching::repls::CommandReplExt`, `teloxide::prelude::CommandReplExt` ([issue #740](https://github.com/teloxide/teloxide/issues/740))
|
||||||
|
|
||||||
### Deprecated
|
### Deprecated
|
||||||
|
|
||||||
- `teloxide::dispatching::repls::{commands_repl, commands_repl_with_listener}`, `teloxide::utils::command::BotCommands::ty` (use `CommandReplExt` instead)
|
- `teloxide::dispatching::repls::{commands_repl, commands_repl_with_listener}`, `teloxide::utils::command::BotCommands::ty` (use `CommandReplExt` instead)
|
||||||
|
|
||||||
## 0.11.0 - 2022-10-07
|
## 0.11.0 - 2022-10-07
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ webhooks-axum = ["webhooks", "axum", "tower", "tower-http"]
|
||||||
|
|
||||||
sqlite-storage = ["sqlx"]
|
sqlite-storage = ["sqlx"]
|
||||||
redis-storage = ["redis"]
|
redis-storage = ["redis"]
|
||||||
|
rocksdb-storage = ["rocksdb"]
|
||||||
cbor-serializer = ["serde_cbor"]
|
cbor-serializer = ["serde_cbor"]
|
||||||
bincode-serializer = ["bincode"]
|
bincode-serializer = ["bincode"]
|
||||||
|
|
||||||
|
@ -42,6 +43,7 @@ full = [
|
||||||
"webhooks-axum",
|
"webhooks-axum",
|
||||||
"sqlite-storage",
|
"sqlite-storage",
|
||||||
"redis-storage",
|
"redis-storage",
|
||||||
|
"rocksdb-storage",
|
||||||
"cbor-serializer",
|
"cbor-serializer",
|
||||||
"bincode-serializer",
|
"bincode-serializer",
|
||||||
"macros",
|
"macros",
|
||||||
|
@ -92,6 +94,9 @@ sqlx = { version = "0.6", optional = true, default-features = false, features =
|
||||||
"sqlite",
|
"sqlite",
|
||||||
] }
|
] }
|
||||||
redis = { version = "0.21", features = ["tokio-comp"], optional = true }
|
redis = { version = "0.21", features = ["tokio-comp"], optional = true }
|
||||||
|
rocksdb = { version = "0.19", optional = true, default-features = false, features = [
|
||||||
|
"lz4",
|
||||||
|
] }
|
||||||
serde_cbor = { version = "0.11", optional = true }
|
serde_cbor = { version = "0.11", optional = true }
|
||||||
bincode = { version = "1.3", optional = true }
|
bincode = { version = "1.3", optional = true }
|
||||||
axum = { version = "0.5.13", optional = true }
|
axum = { version = "0.5.13", optional = true }
|
||||||
|
|
|
@ -31,10 +31,11 @@
|
||||||
|
|
||||||
- **Feature-rich.** You can use both long polling and webhooks, configure an underlying HTTPS client, set a custom URL of a Telegram API server, do graceful shutdown, and much more.
|
- **Feature-rich.** You can use both long polling and webhooks, configure an underlying HTTPS client, set a custom URL of a Telegram API server, do graceful shutdown, and much more.
|
||||||
|
|
||||||
- **Simple dialogues.** Our dialogues subsystem is simple and easy-to-use, and, furthermore, is agnostic of how/where dialogues are stored. For example, you can just replace a one line to achieve [persistence]. Out-of-the-box storages include [Redis] and [Sqlite].
|
- **Simple dialogues.** Our dialogues subsystem is simple and easy-to-use, and, furthermore, is agnostic of how/where dialogues are stored. For example, you can just replace a one line to achieve [persistence]. Out-of-the-box storages include [Redis], [RocksDB] and [Sqlite].
|
||||||
|
|
||||||
[persistence]: https://en.wikipedia.org/wiki/Persistence_(computer_science)
|
[persistence]: https://en.wikipedia.org/wiki/Persistence_(computer_science)
|
||||||
[Redis]: https://redis.io/
|
[Redis]: https://redis.io/
|
||||||
|
[RocksDB]: https://rocksdb.org/
|
||||||
[Sqlite]: https://www.sqlite.org
|
[Sqlite]: https://www.sqlite.org
|
||||||
|
|
||||||
- **Strongly typed commands.** Define bot commands as an `enum` and teloxide will parse them automatically — just like JSON structures in [`serde-json`] and command-line arguments in [`structopt`].
|
- **Strongly typed commands.** Define bot commands as an `enum` and teloxide will parse them automatically — just like JSON structures in [`serde-json`] and command-line arguments in [`structopt`].
|
||||||
|
|
|
@ -9,6 +9,9 @@ mod redis_storage;
|
||||||
#[cfg(feature = "sqlite-storage")]
|
#[cfg(feature = "sqlite-storage")]
|
||||||
mod sqlite_storage;
|
mod sqlite_storage;
|
||||||
|
|
||||||
|
#[cfg(feature = "rocksdb-storage")]
|
||||||
|
mod rocksdb_storage;
|
||||||
|
|
||||||
use futures::future::BoxFuture;
|
use futures::future::BoxFuture;
|
||||||
use teloxide_core::types::ChatId;
|
use teloxide_core::types::ChatId;
|
||||||
|
|
||||||
|
@ -25,6 +28,9 @@ use std::sync::Arc;
|
||||||
#[cfg(feature = "sqlite-storage")]
|
#[cfg(feature = "sqlite-storage")]
|
||||||
pub use sqlite_storage::{SqliteStorage, SqliteStorageError};
|
pub use sqlite_storage::{SqliteStorage, SqliteStorageError};
|
||||||
|
|
||||||
|
#[cfg(feature = "rocksdb-storage")]
|
||||||
|
pub use rocksdb_storage::{RocksDbStorage, RocksDbStorageError};
|
||||||
|
|
||||||
/// A storage with an erased error type.
|
/// A storage with an erased error type.
|
||||||
pub type ErasedStorage<D> =
|
pub type ErasedStorage<D> =
|
||||||
dyn Storage<D, Error = Box<dyn std::error::Error + Send + Sync>> + Send + Sync;
|
dyn Storage<D, Error = Box<dyn std::error::Error + Send + Sync>> + Send + Sync;
|
||||||
|
@ -41,10 +47,12 @@ pub type ErasedStorage<D> =
|
||||||
///
|
///
|
||||||
/// - [`InMemStorage`] -- a storage based on [`std::collections::HashMap`].
|
/// - [`InMemStorage`] -- a storage based on [`std::collections::HashMap`].
|
||||||
/// - [`RedisStorage`] -- a Redis-based storage.
|
/// - [`RedisStorage`] -- a Redis-based storage.
|
||||||
|
/// - [`RocksDbStorage`] -- a RocksDB-based persistent storage.
|
||||||
/// - [`SqliteStorage`] -- an SQLite-based persistent storage.
|
/// - [`SqliteStorage`] -- an SQLite-based persistent storage.
|
||||||
///
|
///
|
||||||
/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage
|
/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage
|
||||||
/// [`RedisStorage`]: crate::dispatching::dialogue::RedisStorage
|
/// [`RedisStorage`]: crate::dispatching::dialogue::RedisStorage
|
||||||
|
/// [`RocksDbStorage`]: crate::dispatching::dialogue::RocksDbStorage
|
||||||
/// [`SqliteStorage`]: crate::dispatching::dialogue::SqliteStorage
|
/// [`SqliteStorage`]: crate::dispatching::dialogue::SqliteStorage
|
||||||
pub trait Storage<D> {
|
pub trait Storage<D> {
|
||||||
type Error;
|
type Error;
|
||||||
|
|
113
src/dispatching/dialogue/storage/rocksdb_storage.rs
Normal file
113
src/dispatching/dialogue/storage/rocksdb_storage.rs
Normal file
|
@ -0,0 +1,113 @@
|
||||||
|
use super::{serializer::Serializer, Storage};
|
||||||
|
use futures::future::BoxFuture;
|
||||||
|
use rocksdb::{DBCompressionType, DBWithThreadMode, MultiThreaded};
|
||||||
|
use serde::{de::DeserializeOwned, Serialize};
|
||||||
|
use std::{
|
||||||
|
convert::Infallible,
|
||||||
|
fmt::{Debug, Display},
|
||||||
|
str,
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
use teloxide_core::types::ChatId;
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
/// A persistent dialogue storage based on [RocksDb](http://rocksdb.org/).
|
||||||
|
pub struct RocksDbStorage<S> {
|
||||||
|
db: DBWithThreadMode<MultiThreaded>,
|
||||||
|
serializer: S,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An error returned from [`RocksDbStorage`].
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum RocksDbStorageError<SE>
|
||||||
|
where
|
||||||
|
SE: Debug + Display,
|
||||||
|
{
|
||||||
|
#[error("dialogue serialization error: {0}")]
|
||||||
|
SerdeError(SE),
|
||||||
|
|
||||||
|
#[error("RocksDb error: {0}")]
|
||||||
|
RocksDbError(#[from] rocksdb::Error),
|
||||||
|
|
||||||
|
/// Returned from [`RocksDbStorage::remove_dialogue`].
|
||||||
|
#[error("row not found")]
|
||||||
|
DialogueNotFound,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> RocksDbStorage<S> {
|
||||||
|
pub async fn open(
|
||||||
|
path: &str,
|
||||||
|
serializer: S,
|
||||||
|
options: Option<rocksdb::Options>,
|
||||||
|
) -> Result<Arc<Self>, RocksDbStorageError<Infallible>> {
|
||||||
|
let options = match options {
|
||||||
|
Some(opts) => opts,
|
||||||
|
None => {
|
||||||
|
let mut opts = rocksdb::Options::default();
|
||||||
|
opts.set_compression_type(DBCompressionType::Lz4);
|
||||||
|
opts.create_if_missing(true);
|
||||||
|
opts
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let db = DBWithThreadMode::<MultiThreaded>::open(&options, path)?;
|
||||||
|
Ok(Arc::new(Self { db, serializer }))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, D> Storage<D> for RocksDbStorage<S>
|
||||||
|
where
|
||||||
|
S: Send + Sync + Serializer<D> + 'static,
|
||||||
|
D: Send + Serialize + DeserializeOwned + 'static,
|
||||||
|
<S as Serializer<D>>::Error: Debug + Display,
|
||||||
|
{
|
||||||
|
type Error = RocksDbStorageError<<S as Serializer<D>>::Error>;
|
||||||
|
|
||||||
|
/// Returns [`RocksDbStorageError::DialogueNotFound`] if a dialogue does not
|
||||||
|
/// exist.
|
||||||
|
fn remove_dialogue(
|
||||||
|
self: Arc<Self>,
|
||||||
|
ChatId(chat_id): ChatId,
|
||||||
|
) -> BoxFuture<'static, Result<(), Self::Error>> {
|
||||||
|
Box::pin(async move {
|
||||||
|
let key = chat_id.to_le_bytes();
|
||||||
|
|
||||||
|
if self.db.get(&key)?.is_none() {
|
||||||
|
return Err(RocksDbStorageError::DialogueNotFound);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.db.delete(&key).unwrap();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_dialogue(
|
||||||
|
self: Arc<Self>,
|
||||||
|
ChatId(chat_id): ChatId,
|
||||||
|
dialogue: D,
|
||||||
|
) -> BoxFuture<'static, Result<(), Self::Error>> {
|
||||||
|
Box::pin(async move {
|
||||||
|
let d =
|
||||||
|
self.serializer.serialize(&dialogue).map_err(RocksDbStorageError::SerdeError)?;
|
||||||
|
|
||||||
|
let key = chat_id.to_le_bytes();
|
||||||
|
self.db.put(&key, &d)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_dialogue(
|
||||||
|
self: Arc<Self>,
|
||||||
|
ChatId(chat_id): ChatId,
|
||||||
|
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
|
||||||
|
Box::pin(async move {
|
||||||
|
let key = chat_id.to_le_bytes();
|
||||||
|
self.db
|
||||||
|
.get(&key)?
|
||||||
|
.map(|d| self.serializer.deserialize(&d).map_err(RocksDbStorageError::SerdeError))
|
||||||
|
.transpose()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
95
tests/rocksdb.rs
Normal file
95
tests/rocksdb.rs
Normal file
|
@ -0,0 +1,95 @@
|
||||||
|
use std::{
|
||||||
|
fmt::{Debug, Display},
|
||||||
|
fs,
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
use teloxide::{
|
||||||
|
dispatching::dialogue::{RocksDbStorage, RocksDbStorageError, Serializer, Storage},
|
||||||
|
types::ChatId,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread")]
|
||||||
|
async fn test_rocksdb_json() {
|
||||||
|
fs::remove_dir_all("./test_db1").ok();
|
||||||
|
fs::create_dir("./test_db1").unwrap();
|
||||||
|
let storage = RocksDbStorage::open(
|
||||||
|
"./test_db1/test_db1.rocksdb",
|
||||||
|
teloxide::dispatching::dialogue::serializer::Json,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
test_rocksdb(storage).await;
|
||||||
|
fs::remove_dir_all("./test_db1").unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread")]
|
||||||
|
async fn test_rocksdb_bincode() {
|
||||||
|
fs::remove_dir_all("./test_db2").ok();
|
||||||
|
fs::create_dir("./test_db2").unwrap();
|
||||||
|
let storage = RocksDbStorage::open(
|
||||||
|
"./test_db2/test_db2.rocksdb",
|
||||||
|
teloxide::dispatching::dialogue::serializer::Bincode,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
test_rocksdb(storage).await;
|
||||||
|
fs::remove_dir_all("./test_db2").unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread")]
|
||||||
|
async fn test_rocksdb_cbor() {
|
||||||
|
fs::remove_dir_all("./test_db3").ok();
|
||||||
|
fs::create_dir("./test_db3").unwrap();
|
||||||
|
let storage = RocksDbStorage::open(
|
||||||
|
"./test_db3/test_db3.rocksdb",
|
||||||
|
teloxide::dispatching::dialogue::serializer::Cbor,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
test_rocksdb(storage).await;
|
||||||
|
fs::remove_dir_all("./test_db3").unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
type Dialogue = String;
|
||||||
|
|
||||||
|
macro_rules! test_dialogues {
|
||||||
|
($storage:expr, $_0:expr, $_1:expr, $_2:expr) => {
|
||||||
|
assert_eq!(Arc::clone(&$storage).get_dialogue(ChatId(1)).await.unwrap(), $_0);
|
||||||
|
assert_eq!(Arc::clone(&$storage).get_dialogue(ChatId(11)).await.unwrap(), $_1);
|
||||||
|
assert_eq!(Arc::clone(&$storage).get_dialogue(ChatId(256)).await.unwrap(), $_2);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn test_rocksdb<S>(storage: Arc<RocksDbStorage<S>>)
|
||||||
|
where
|
||||||
|
S: Send + Sync + Serializer<Dialogue> + 'static,
|
||||||
|
<S as Serializer<Dialogue>>::Error: Debug + Display,
|
||||||
|
{
|
||||||
|
test_dialogues!(storage, None, None, None);
|
||||||
|
|
||||||
|
Arc::clone(&storage).update_dialogue(ChatId(1), "ABC".to_owned()).await.unwrap();
|
||||||
|
Arc::clone(&storage).update_dialogue(ChatId(11), "DEF".to_owned()).await.unwrap();
|
||||||
|
Arc::clone(&storage).update_dialogue(ChatId(256), "GHI".to_owned()).await.unwrap();
|
||||||
|
|
||||||
|
test_dialogues!(
|
||||||
|
storage,
|
||||||
|
Some("ABC".to_owned()),
|
||||||
|
Some("DEF".to_owned()),
|
||||||
|
Some("GHI".to_owned())
|
||||||
|
);
|
||||||
|
|
||||||
|
Arc::clone(&storage).remove_dialogue(ChatId(1)).await.unwrap();
|
||||||
|
Arc::clone(&storage).remove_dialogue(ChatId(11)).await.unwrap();
|
||||||
|
Arc::clone(&storage).remove_dialogue(ChatId(256)).await.unwrap();
|
||||||
|
|
||||||
|
test_dialogues!(storage, None, None, None);
|
||||||
|
|
||||||
|
// Check that a try to remove a non-existing dialogue results in an error.
|
||||||
|
assert!(matches!(
|
||||||
|
Arc::clone(&storage).remove_dialogue(ChatId(1)).await.unwrap_err(),
|
||||||
|
RocksDbStorageError::DialogueNotFound
|
||||||
|
));
|
||||||
|
}
|
Loading…
Reference in a new issue