Don't reconnect to Redis every time

This commit is contained in:
Maximilian Siling 2020-04-19 19:24:24 +03:00
parent 798102a7d7
commit a67d896d26
2 changed files with 21 additions and 12 deletions

View file

@ -198,10 +198,12 @@ async fn run() {
handle_message(cx).await.expect("Something wrong with the bot!")
},
Arc::new(
// You can also choose Serializer::JSON or Serializer::Bincode
// All serializer but JSON require enabling feature "serializer-<name>",
// e. g. "serializer-cbor" or "serializer-bincode"
RedisStorage::open("redis://127.0.0.1:6379", Serializer::CBOR)
// You can also choose Serializer::JSON or Serializer::CBOR
// All serializer but JSON require enabling feature
// "serializer-<name>", e. g. "serializer-cbor"
// or "serializer-bincode"
RedisStorage::open("redis://127.0.0.1:6379", Serializer::Bincode)
.await
.unwrap(),
),
))

View file

@ -5,8 +5,9 @@ use super::{
use futures::future::BoxFuture;
use redis::{AsyncCommands, FromRedisValue, IntoConnectionInfo};
use serde::{de::DeserializeOwned, Serialize};
use std::sync::Arc;
use std::{ops::DerefMut, sync::Arc};
use thiserror::Error;
use tokio::sync::Mutex;
#[derive(Debug, Error)]
pub enum Error {
@ -19,16 +20,21 @@ pub enum Error {
type Result<T, E = Error> = std::result::Result<T, E>;
pub struct RedisStorage {
client: redis::Client,
conn: Mutex<redis::aio::Connection>,
serializer: Serializer,
}
impl RedisStorage {
pub fn open(
pub async fn open(
url: impl IntoConnectionInfo,
serializer: Serializer,
) -> Result<Self> {
Ok(Self { client: redis::Client::open(url)?, serializer })
Ok(Self {
conn: Mutex::new(
redis::Client::open(url)?.get_async_connection().await?,
),
serializer,
})
}
}
@ -45,12 +51,11 @@ where
chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>>> {
Box::pin(async move {
let mut conn = self.client.get_async_connection().await?;
let res = redis::pipe()
.atomic()
.get(chat_id)
.del(chat_id).ignore()
.query_async::<_, redis::Value>(&mut conn)
.query_async::<_, redis::Value>(self.conn.lock().await.deref_mut())
.await?;
// We're expecting `.pipe()` to return us an exactly one result in bulk,
// so all other branches should be unreachable
@ -73,9 +78,11 @@ where
dialogue: D,
) -> BoxFuture<'static, Result<Option<D>>> {
Box::pin(async move {
let mut conn = self.client.get_async_connection().await?;
let dialogue = self.serializer.serialize(&dialogue)?;
Ok(conn
Ok(self
.conn
.lock()
.await
.getset::<_, Vec<u8>, Option<Vec<u8>>>(chat_id, dialogue)
.await?
.map(|d| self.serializer.deserialize(&d))