From 576b14586e469ab1cbab7fed52bf484148ac31f6 Mon Sep 17 00:00:00 2001 From: arrtchiu Date: Sun, 30 Jun 2024 13:20:35 -0700 Subject: [PATCH] use deadpool-redis for pooling (fixes #707) --- Cargo.lock | 58 +++++++++++-------- crates/teloxide/Cargo.toml | 4 +- .../dialogue/storage/redis_storage.rs | 35 ++++++----- 3 files changed, 58 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ac559433..91780631 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -399,6 +399,36 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "deadpool" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6541a3916932fe57768d4be0b1ffb5ec7cbf74ca8c903fdfd5c0fe8aa958f0ed" +dependencies = [ + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-redis" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ff315fab2a7a42132352909afc81140d06b8bbfd1414b098ce278e3f95dd1b9" +dependencies = [ + "deadpool", + "redis", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" +dependencies = [ + "tokio", +] + [[package]] name = "derive_more" version = "0.99.17" @@ -884,7 +914,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.6", + "socket2", "tokio", "tower-service", "tracing", @@ -1456,9 +1486,9 @@ dependencies = [ [[package]] name = "redis" -version = "0.24.0" +version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c580d9cbbe1d1b479e8d67cf9daf6a62c957e6846048408b80b43ac3f6af84cd" +checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec" dependencies = [ "async-trait", "bytes", @@ -1468,8 +1498,6 @@ dependencies = [ "percent-encoding", "pin-project-lite", "ryu", - "sha1_smol", - "socket2 0.4.10", "tokio", "tokio-util", "url", @@ -1786,12 +1814,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "sha1_smol" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" - [[package]] name = "sha2" version = "0.10.8" @@ -1827,16 +1849,6 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" -[[package]] -name = "socket2" -version = "0.4.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" -dependencies = [ - "libc", - "winapi", -] - [[package]] name = "socket2" version = "0.5.6" @@ -2115,6 +2127,7 @@ dependencies = [ "bincode", "bytes", "chrono", + "deadpool-redis", "derive_more", "dptree", "either", @@ -2124,7 +2137,6 @@ dependencies = [ "pin-project", "pretty_env_logger 0.5.0", "rand", - "redis", "reqwest", "serde", "serde_cbor", @@ -2256,7 +2268,7 @@ dependencies = [ "num_cpus", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.6", + "socket2", "tokio-macros", "windows-sys 0.48.0", ] diff --git a/crates/teloxide/Cargo.toml b/crates/teloxide/Cargo.toml index 8d26b8a5..12936835 100644 --- a/crates/teloxide/Cargo.toml +++ b/crates/teloxide/Cargo.toml @@ -31,7 +31,7 @@ sqlite-storage-nativetls = [ sqlite-storage-rustls = ["sqlx", "sqlx/runtime-tokio-rustls", "rustls"] postgres-storage-nativetls = ["sqlx", "sqlx/runtime-tokio-native-tls", "native-tls"] postgres-storage-rustls = ["sqlx", "sqlx/runtime-tokio-rustls", "rustls"] -redis-storage = ["redis"] +redis-storage = ["deadpool-redis"] cbor-serializer = ["serde_cbor"] bincode-serializer = ["bincode"] @@ -108,7 +108,7 @@ sqlx = { version = "0.7.3", optional = true, default-features = false, features "sqlite", "postgres" ] } -redis = { version = "0.24", features = ["tokio-comp"], optional = true } +deadpool-redis = { version = "0.15.1", features = ["rt_tokio_1"], optional = true } serde_cbor = { version = "0.11", optional = true } bincode = { version = "1.3", optional = true } axum = { version = "0.6.0", optional = true } diff --git a/crates/teloxide/src/dispatching/dialogue/storage/redis_storage.rs b/crates/teloxide/src/dispatching/dialogue/storage/redis_storage.rs index de9294d8..17a65ea7 100644 --- a/crates/teloxide/src/dispatching/dialogue/storage/redis_storage.rs +++ b/crates/teloxide/src/dispatching/dialogue/storage/redis_storage.rs @@ -1,16 +1,15 @@ use super::{serializer::Serializer, Storage}; +use deadpool_redis::{redis, CreatePoolError, PoolError, Runtime}; use futures::future::BoxFuture; -use redis::{AsyncCommands, IntoConnectionInfo}; +use redis::AsyncCommands; use serde::{de::DeserializeOwned, Serialize}; use std::{ convert::Infallible, fmt::{Debug, Display}, - ops::DerefMut, sync::Arc, }; use teloxide_core::types::ChatId; use thiserror::Error; -use tokio::sync::Mutex; /// An error returned from [`RedisStorage`]. #[derive(Debug, Error)] @@ -24,6 +23,12 @@ where #[error("error from Redis: {0}")] RedisError(#[from] redis::RedisError), + #[error("error creating redis pool: {0}")] + CreatePoolError(#[from] CreatePoolError), + + #[error("redis pool error: {0}")] + PoolError(#[from] PoolError), + /// Returned from [`RedisStorage::remove_dialogue`]. #[error("row not found")] DialogueNotFound, @@ -31,19 +36,19 @@ where /// A dialogue storage based on [Redis](https://redis.io/). pub struct RedisStorage { - conn: Mutex, + pool: deadpool_redis::Pool, serializer: S, } impl RedisStorage { pub async fn open( - url: impl IntoConnectionInfo, + url: &str, serializer: S, ) -> Result, RedisStorageError> { - Ok(Arc::new(Self { - conn: Mutex::new(redis::Client::open(url)?.get_async_connection().await?), - serializer, - })) + let config = deadpool_redis::Config::from_url(url); + let pool = config.create_pool(Some(Runtime::Tokio1))?; + + Ok(Arc::new(Self { pool, serializer })) } } @@ -60,10 +65,12 @@ where ChatId(chat_id): ChatId, ) -> BoxFuture<'static, Result<(), Self::Error>> { Box::pin(async move { + let mut pool = self.pool.get().await?; + let deleted_rows_count = redis::pipe() .atomic() .del(chat_id) - .query_async::<_, redis::Value>(self.conn.lock().await.deref_mut()) + .query_async::<_, redis::Value>(&mut pool) .await?; if let redis::Value::Bulk(values) = deleted_rows_count { @@ -89,7 +96,7 @@ where Box::pin(async move { let dialogue = self.serializer.serialize(&dialogue).map_err(RedisStorageError::SerdeError)?; - self.conn.lock().await.set::<_, Vec, _>(chat_id, dialogue).await?; + self.pool.get().await?.set::<_, Vec, _>(chat_id, dialogue).await?; Ok(()) }) } @@ -99,9 +106,9 @@ where ChatId(chat_id): ChatId, ) -> BoxFuture<'static, Result, Self::Error>> { Box::pin(async move { - self.conn - .lock() - .await + self.pool + .get() + .await? .get::<_, Option>>(chat_id) .await? .map(|d| self.serializer.deserialize(&d).map_err(RedisStorageError::SerdeError))