Merge pull request #294 from teloxide/sqlite_storage

Sqlite storage v2
This commit is contained in:
Waffle Lapkin 2020-11-04 12:01:21 +03:00 committed by GitHub
commit 89ae390dfe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 373 additions and 8 deletions

View file

@ -39,9 +39,9 @@ jobs:
include: include:
- rust: stable - rust: stable
features: "--features \"redis-storage cbor-serializer bincode-serializer frunk-\"" features: "--features \"redis-storage sqlite-storage cbor-serializer bincode-serializer frunk-\""
- rust: beta - rust: beta
features: "--features \"redis-storage cbor-serializer bincode-serializer frunk-\"" features: "--features \"redis-storage sqlite-storage cbor-serializer bincode-serializer frunk-\""
- rust: nightly - rust: nightly
features: "--all-features" features: "--all-features"

1
.gitignore vendored
View file

@ -4,3 +4,4 @@ Cargo.lock
.idea/ .idea/
.vscode/ .vscode/
examples/*/target examples/*/target
*.sqlite

View file

@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added ### Added
- Allow arbitrary error types to be returned from (sub)transitions ([issue 242](https://github.com/teloxide/teloxide/issues/242)). - Allow arbitrary error types to be returned from (sub)transitions ([issue 242](https://github.com/teloxide/teloxide/issues/242)).
- The `respond` function, a shortcut for `ResponseResult::Ok(())`. - The `respond` function, a shortcut for `ResponseResult::Ok(())`.
- The `sqlite-storage` feature -- enables SQLite support.
### Changed ### Changed
- Allow `bot_name` be `N`, where `N: Into<String> + ...` in `commands_repl` & `commands_repl_with_listener`. - Allow `bot_name` be `N`, where `N: Into<String> + ...` in `commands_repl` & `commands_repl_with_listener`.

View file

@ -7,7 +7,7 @@ To change the source code, fork the `dev` branch of this repository and work ins
``` ```
cargo clippy --all --all-features --all-targets cargo clippy --all --all-features --all-targets
cargo test --all cargo test --all
cargo doc --open RUSTDOCFLAGS="--cfg docsrs" cargo doc --open --all-features
# Using nightly rustfmt # Using nightly rustfmt
cargo +nightly fmt --all -- --check cargo +nightly fmt --all -- --check
``` ```

View file

@ -24,6 +24,7 @@ authors = [
maintenance = { status = "actively-developed" } maintenance = { status = "actively-developed" }
[features] [features]
sqlite-storage = ["sqlx"]
redis-storage = ["redis"] redis-storage = ["redis"]
cbor-serializer = ["serde_cbor"] cbor-serializer = ["serde_cbor"]
bincode-serializer = ["bincode"] bincode-serializer = ["bincode"]
@ -54,6 +55,11 @@ futures = "0.3.5"
pin-project = "0.4.22" pin-project = "0.4.22"
serde_with_macros = "1.1.0" serde_with_macros = "1.1.0"
sqlx = { version = "0.4.0-beta.1", optional = true, default-features = false, features = [
"runtime-tokio",
"macros",
"sqlite",
] }
redis = { version = "0.16.0", optional = true } redis = { version = "0.16.0", optional = true }
serde_cbor = { version = "0.11.1", optional = true } serde_cbor = { version = "0.11.1", optional = true }
bincode = { version = "1.3.1", optional = true } bincode = { version = "1.3.1", optional = true }
@ -76,3 +82,8 @@ rustdoc-args = ["--cfg", "docsrs"]
name = "redis" name = "redis"
path = "tests/redis.rs" path = "tests/redis.rs"
required-features = ["redis-storage", "cbor-serializer", "bincode-serializer"] required-features = ["redis-storage", "cbor-serializer", "bincode-serializer"]
[[test]]
name = "sqlite"
path = "tests/sqlite.rs"
required-features = ["sqlite-storage", "cbor-serializer", "bincode-serializer"]

View file

@ -43,10 +43,11 @@
[functional reactive design]: https://en.wikipedia.org/wiki/Functional_reactive_programming [functional reactive design]: https://en.wikipedia.org/wiki/Functional_reactive_programming
[other adaptors]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html [other adaptors]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html
- **Dialogues management subsystem.** We have designed our dialogues management subsystem to be easy-to-use, and, furthermore, to be agnostic of how/where dialogues are stored. For example, you can just replace a one line to achieve [persistence]. Out-of-the-box storages include [Redis]. - **Dialogues management subsystem.** We have designed our dialogues management subsystem to be easy-to-use, and, furthermore, to be agnostic of how/where dialogues are stored. For example, you can just replace a one line to achieve [persistence]. Out-of-the-box storages include [Redis] and [Sqlite].
[persistence]: https://en.wikipedia.org/wiki/Persistence_(computer_science) [persistence]: https://en.wikipedia.org/wiki/Persistence_(computer_science)
[Redis]: https://redis.io/ [Redis]: https://redis.io/
[Sqlite]: https://www.sqlite.org
- **Strongly typed bot commands.** You can describe bot commands as enumerations, and then they'll be automatically constructed from strings — just like JSON structures in [serde-json] and command-line arguments in [structopt]. - **Strongly typed bot commands.** You can describe bot commands as enumerations, and then they'll be automatically constructed from strings — just like JSON structures in [serde-json] and command-line arguments in [structopt].
@ -371,6 +372,7 @@ The second one produces very strange compiler messages due to the `#[tokio::main
## Cargo features ## Cargo features
- `redis-storage` -- enables the [Redis] support. - `redis-storage` -- enables the [Redis] support.
- `sqlite-storage` -- enables the [Sqlite] support.
- `cbor-serializer` -- enables the [CBOR] serializer for dialogues. - `cbor-serializer` -- enables the [CBOR] serializer for dialogues.
- `bincode-serializer` -- enables the [Bincode] serializer for dialogues. - `bincode-serializer` -- enables the [Bincode] serializer for dialogues.
- `frunk` -- enables [`teloxide::utils::UpState`], which allows mapping from a structure of `field1, ..., fieldN` to a structure of `field1, ..., fieldN, fieldN+1`. - `frunk` -- enables [`teloxide::utils::UpState`], which allows mapping from a structure of `field1, ..., fieldN` to a structure of `field1, ..., fieldN, fieldN+1`.

View file

@ -0,0 +1,20 @@
[package]
name = "sqlite_remember_bot"
version = "0.1.0"
authors = ["Maximilian Siling <mouse-art@ya.ru>", "Sergey Levitin <selevit@gmail.com>"]
edition = "2018"
[dependencies]
log = "0.4.8"
pretty_env_logger = "0.4.0"
tokio = { version = "0.2.11", features = ["rt-threaded", "macros"] }
# You can also choose "cbor-serializer" or built-in JSON serializer
teloxide = { path = "../../", features = ["sqlite-storage", "bincode-serializer", "redis-storage"] }
teloxide-macros = { git = "https://github.com/teloxide/teloxide-macros", branch = "master" }
serde = "1.0.104"
futures = "0.3.5"
thiserror = "1.0.15"
derive_more = "0.99.9"

View file

@ -0,0 +1,50 @@
#[macro_use]
extern crate derive_more;
mod states;
mod transitions;
use states::*;
use teloxide::{
dispatching::dialogue::{serializer::JSON, SqliteStorage, Storage},
prelude::*,
};
use thiserror::Error;
type StorageError = <SqliteStorage<JSON> as Storage<Dialogue>>::Error;
#[derive(Debug, Error)]
enum Error {
#[error("error from Telegram: {0}")]
TelegramError(#[from] RequestError),
#[error("error from storage: {0}")]
StorageError(#[from] StorageError),
}
type In = DialogueWithCx<Message, Dialogue, StorageError>;
async fn handle_message(cx: UpdateWithCx<Message>, dialogue: Dialogue) -> TransitionOut<Dialogue> {
match cx.update.text_owned() {
None => {
cx.answer_str("Send me a text message.").await?;
next(dialogue)
}
Some(ans) => dialogue.react(cx, ans).await,
}
}
#[tokio::main]
async fn main() {
let bot = Bot::from_env();
Dispatcher::new(bot)
.messages_handler(DialogueDispatcher::with_storage(
|DialogueWithCx { cx, dialogue }: In| async move {
let dialogue = dialogue.expect("std::convert::Infallible");
handle_message(cx, dialogue).await.expect("Something wrong with the bot!")
},
SqliteStorage::open("db.sqlite", JSON).await.unwrap(),
))
.dispatch()
.await;
}

View file

@ -0,0 +1,23 @@
use teloxide_macros::Transition;
use serde::{Deserialize, Serialize};
#[derive(Transition, From, Serialize, Deserialize)]
pub enum Dialogue {
Start(StartState),
HaveNumber(HaveNumberState),
}
impl Default for Dialogue {
fn default() -> Self {
Self::Start(StartState)
}
}
#[derive(Serialize, Deserialize)]
pub struct StartState;
#[derive(Serialize, Deserialize)]
pub struct HaveNumberState {
pub number: i32,
}

View file

@ -0,0 +1,35 @@
use teloxide::prelude::*;
use teloxide_macros::teloxide;
use super::states::*;
#[teloxide(subtransition)]
async fn start(state: StartState, cx: TransitionIn, ans: String) -> TransitionOut<Dialogue> {
if let Ok(number) = ans.parse() {
cx.answer_str(format!("Remembered number {}. Now use /get or /reset", number)).await?;
next(HaveNumberState { number })
} else {
cx.answer_str("Please, send me a number").await?;
next(state)
}
}
#[teloxide(subtransition)]
async fn have_number(
state: HaveNumberState,
cx: TransitionIn,
ans: String,
) -> TransitionOut<Dialogue> {
let num = state.number;
if ans.starts_with("/get") {
cx.answer_str(format!("Here is your number: {}", num)).await?;
next(state)
} else if ans.starts_with("/reset") {
cx.answer_str("Resetted number").await?;
next(StartState)
} else {
cx.answer_str("Please, send /get or /reset").await?;
next(state)
}
}

View file

@ -167,4 +167,7 @@ pub use teloxide_macros::Transition;
#[cfg_attr(all(teloxide_docsrs, feature = "nightly"), doc(cfg(feature = "redis-storage")))] #[cfg_attr(all(teloxide_docsrs, feature = "nightly"), doc(cfg(feature = "redis-storage")))]
pub use storage::{RedisStorage, RedisStorageError}; pub use storage::{RedisStorage, RedisStorageError};
#[cfg(feature = "sqlite-storage")]
pub use storage::{SqliteStorage, SqliteStorageError};
pub use storage::{serializer, InMemStorage, Serializer, Storage}; pub use storage::{serializer, InMemStorage, Serializer, Storage};

View file

@ -8,8 +8,11 @@ use tokio::sync::Mutex;
/// ///
/// ## Note /// ## Note
/// All the dialogues will be lost after you restart your bot. If you need to /// All the dialogues will be lost after you restart your bot. If you need to
/// store them somewhere on a drive, you need to implement a storage /// store them somewhere on a drive, you should use [`SqliteStorage`],
/// communicating with a DB. /// [`RedisStorage`] or implement your own.
///
/// [`RedisStorage`]: crate::dispatching::dialogue::RedisStorage
/// [`SqliteStorage`]: crate::dispatching::dialogue::SqliteStorage
#[derive(Debug)] #[derive(Debug)]
pub struct InMemStorage<D> { pub struct InMemStorage<D> {
map: Mutex<HashMap<i64, D>>, map: Mutex<HashMap<i64, D>>,

View file

@ -5,6 +5,9 @@ mod in_mem_storage;
#[cfg(feature = "redis-storage")] #[cfg(feature = "redis-storage")]
mod redis_storage; mod redis_storage;
#[cfg(feature = "sqlite-storage")]
mod sqlite_storage;
use futures::future::BoxFuture; use futures::future::BoxFuture;
pub use in_mem_storage::InMemStorage; pub use in_mem_storage::InMemStorage;
@ -15,14 +18,23 @@ pub use redis_storage::{RedisStorage, RedisStorageError};
pub use serializer::Serializer; pub use serializer::Serializer;
use std::sync::Arc; use std::sync::Arc;
#[cfg(feature = "sqlite-storage")]
pub use sqlite_storage::{SqliteStorage, SqliteStorageError};
/// A storage of dialogues. /// A storage of dialogues.
/// ///
/// You can implement this trait for a structure that communicates with a DB and /// You can implement this trait for a structure that communicates with a DB and
/// be sure that after you restart your bot, all the dialogues won't be lost. /// be sure that after you restart your bot, all the dialogues won't be lost.
/// ///
/// For a storage based on a simple hash map, see [`InMemStorage`]. /// Currently we support the following storages out of the box:
///
/// - [`InMemStorage`] - a storage based on a simple hash map
/// - [`RedisStorage`] - a Redis-based storage
/// - [`SqliteStorage`] - an SQLite-based persistent storage
/// ///
/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage /// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage
/// [`RedisStorage`]: crate::dispatching::dialogue::RedisStorage
/// [`SqliteStorage`]: crate::dispatching::dialogue::SqliteStorage
pub trait Storage<D> { pub trait Storage<D> {
type Error; type Error;

View file

@ -0,0 +1,137 @@
use super::{serializer::Serializer, Storage};
use futures::future::BoxFuture;
use serde::{de::DeserializeOwned, Serialize};
use sqlx::{sqlite::SqlitePool, Executor};
use std::{
convert::Infallible,
fmt::{Debug, Display},
str,
sync::Arc,
};
use thiserror::Error;
/// A persistent storage based on [SQLite](https://www.sqlite.org/).
pub struct SqliteStorage<S> {
pool: SqlitePool,
serializer: S,
}
/// An error returned from [`SqliteStorage`].
///
/// [`SqliteStorage`]: struct.SqliteStorage.html
#[derive(Debug, Error)]
pub enum SqliteStorageError<SE>
where
SE: Debug + Display,
{
#[error("dialogue serialization error: {0}")]
SerdeError(SE),
#[error("sqlite error: {0}")]
SqliteError(#[from] sqlx::Error),
}
impl<S> SqliteStorage<S> {
pub async fn open(
path: &str,
serializer: S,
) -> Result<Arc<Self>, SqliteStorageError<Infallible>> {
let pool = SqlitePool::connect(format!("sqlite:{}?mode=rwc", path).as_str()).await?;
let mut conn = pool.acquire().await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS teloxide_dialogues (
chat_id BIGINT PRIMARY KEY,
dialogue BLOB NOT NULL
);
"#,
)
.execute(&mut conn)
.await?;
Ok(Arc::new(Self { pool, serializer }))
}
}
impl<S, D> Storage<D> for SqliteStorage<S>
where
S: Send + Sync + Serializer<D> + 'static,
D: Send + Serialize + DeserializeOwned + 'static,
<S as Serializer<D>>::Error: Debug + Display,
{
type Error = SqliteStorageError<<S as Serializer<D>>::Error>;
fn remove_dialogue(
self: Arc<Self>,
chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
Box::pin(async move {
Ok(match get_dialogue(&self.pool, chat_id).await? {
Some(d) => {
let prev_dialogue =
self.serializer.deserialize(&d).map_err(SqliteStorageError::SerdeError)?;
sqlx::query("DELETE FROM teloxide_dialogues WHERE chat_id = ?")
.bind(chat_id)
.execute(&self.pool)
.await?;
Some(prev_dialogue)
}
_ => None,
})
})
}
fn update_dialogue(
self: Arc<Self>,
chat_id: i64,
dialogue: D,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
Box::pin(async move {
let prev_dialogue = match get_dialogue(&self.pool, chat_id).await? {
Some(d) => {
Some(self.serializer.deserialize(&d).map_err(SqliteStorageError::SerdeError)?)
}
_ => None,
};
let upd_dialogue =
self.serializer.serialize(&dialogue).map_err(SqliteStorageError::SerdeError)?;
self.pool
.acquire()
.await?
.execute(
sqlx::query(
r#"
INSERT INTO teloxide_dialogues VALUES (?, ?)
ON CONFLICT(chat_id) DO UPDATE SET dialogue=excluded.dialogue
"#,
)
.bind(chat_id)
.bind(upd_dialogue),
)
.await?;
Ok(prev_dialogue)
})
}
}
#[derive(sqlx::FromRow)]
struct DialogueDBRow {
dialogue: Vec<u8>,
}
async fn get_dialogue(
pool: &SqlitePool,
chat_id: i64,
) -> Result<Option<Box<Vec<u8>>>, sqlx::Error> {
Ok(
match sqlx::query_as::<_, DialogueDBRow>(
"SELECT dialogue FROM teloxide_dialogues WHERE chat_id = ?",
)
.bind(chat_id)
.fetch_optional(pool)
.await?
{
Some(r) => Some(Box::new(r.dialogue)),
_ => None,
},
)
}

67
tests/sqlite.rs Normal file
View file

@ -0,0 +1,67 @@
use std::{
fmt::{Debug, Display},
future::Future,
sync::Arc,
};
use teloxide::dispatching::dialogue::{Serializer, SqliteStorage, Storage};
#[tokio::test(threaded_scheduler)]
async fn test_sqlite_json() {
let storage =
SqliteStorage::open("./test_db1.sqlite", teloxide::dispatching::dialogue::serializer::JSON)
.await
.unwrap();
test_sqlite(storage).await;
}
#[tokio::test(threaded_scheduler)]
async fn test_sqlite_bincode() {
let storage = SqliteStorage::open(
"./test_db2.sqlite",
teloxide::dispatching::dialogue::serializer::Bincode,
)
.await
.unwrap();
test_sqlite(storage).await;
}
#[tokio::test(threaded_scheduler)]
async fn test_sqlite_cbor() {
let storage =
SqliteStorage::open("./test_db3.sqlite", teloxide::dispatching::dialogue::serializer::CBOR)
.await
.unwrap();
test_sqlite(storage).await;
}
type Dialogue = String;
async fn test_sqlite<S>(storage: Arc<SqliteStorage<S>>)
where
S: Send + Sync + Serializer<Dialogue> + 'static,
<S as Serializer<Dialogue>>::Error: Debug + Display,
{
check_dialogue(None, Arc::clone(&storage).update_dialogue(1, "ABC".to_owned())).await;
check_dialogue(None, Arc::clone(&storage).update_dialogue(11, "DEF".to_owned())).await;
check_dialogue(None, Arc::clone(&storage).update_dialogue(256, "GHI".to_owned())).await;
// 1 - ABC, 11 - DEF, 256 - GHI
check_dialogue("ABC", Arc::clone(&storage).update_dialogue(1, "JKL".to_owned())).await;
check_dialogue("GHI", Arc::clone(&storage).update_dialogue(256, "MNO".to_owned())).await;
// 1 - GKL, 11 - DEF, 256 - MNO
check_dialogue("JKL", Arc::clone(&storage).remove_dialogue(1)).await;
check_dialogue("DEF", Arc::clone(&storage).remove_dialogue(11)).await;
check_dialogue("MNO", Arc::clone(&storage).remove_dialogue(256)).await;
}
async fn check_dialogue<E>(
expected: impl Into<Option<&str>>,
actual: impl Future<Output = Result<Option<Dialogue>, E>>,
) where
E: Debug,
{
assert_eq!(expected.into().map(ToOwned::to_owned), actual.await.unwrap())
}