diff --git a/examples/dialogue_bot_redis/src/main.rs b/examples/dialogue_bot_redis/src/main.rs index b1954c98..c42aea19 100644 --- a/examples/dialogue_bot_redis/src/main.rs +++ b/examples/dialogue_bot_redis/src/main.rs @@ -198,10 +198,12 @@ async fn run() { handle_message(cx).await.expect("Something wrong with the bot!") }, Arc::new( - // You can also choose Serializer::JSON or Serializer::Bincode - // All serializer but JSON require enabling feature "serializer-<name>", - // e. g. "serializer-cbor" or "serializer-bincode" - RedisStorage::open("redis://127.0.0.1:6379", Serializer::CBOR) + // You can also choose Serializer::JSON or Serializer::CBOR + // All serializer but JSON require enabling feature + // "serializer-<name>", e. g. "serializer-cbor" + // or "serializer-bincode" + RedisStorage::open("redis://127.0.0.1:6379", Serializer::Bincode) + .await .unwrap(), ), )) diff --git a/src/dispatching/dialogue/storage/redis_storage.rs b/src/dispatching/dialogue/storage/redis_storage.rs index 2dad6871..b6f45a3a 100644 --- a/src/dispatching/dialogue/storage/redis_storage.rs +++ b/src/dispatching/dialogue/storage/redis_storage.rs @@ -5,8 +5,9 @@ use super::{ use futures::future::BoxFuture; use redis::{AsyncCommands, FromRedisValue, IntoConnectionInfo}; use serde::{de::DeserializeOwned, Serialize}; -use std::sync::Arc; +use std::{ops::DerefMut, sync::Arc}; use thiserror::Error; +use tokio::sync::Mutex; #[derive(Debug, Error)] pub enum Error { @@ -19,16 +20,21 @@ pub enum Error { type Result<T, E = Error> = std::result::Result<T, E>; pub struct RedisStorage { - client: redis::Client, + conn: Mutex<redis::aio::Connection>, serializer: Serializer, } impl RedisStorage { - pub fn open( + pub async fn open( url: impl IntoConnectionInfo, serializer: Serializer, ) -> Result<Self> { - Ok(Self { client: redis::Client::open(url)?, serializer }) + Ok(Self { + conn: Mutex::new( + redis::Client::open(url)?.get_async_connection().await?, + ), + serializer, + }) } } @@ -45,12 +51,11 @@ where chat_id: i64, ) -> BoxFuture<'static, Result<Option<D>>> { Box::pin(async move { - let mut conn = self.client.get_async_connection().await?; let res = redis::pipe() .atomic() .get(chat_id) .del(chat_id).ignore() - .query_async::<_, redis::Value>(&mut conn) + .query_async::<_, redis::Value>(self.conn.lock().await.deref_mut()) .await?; // We're expecting `.pipe()` to return us an exactly one result in bulk, // so all other branches should be unreachable @@ -73,9 +78,11 @@ where dialogue: D, ) -> BoxFuture<'static, Result<Option<D>>> { Box::pin(async move { - let mut conn = self.client.get_async_connection().await?; let dialogue = self.serializer.serialize(&dialogue)?; - Ok(conn + Ok(self + .conn + .lock() + .await .getset::<_, Vec<u8>, Option<Vec<u8>>>(chat_id, dialogue) .await? .map(|d| self.serializer.deserialize(&d))