Merge pull request #1081 from arrtchiu/deadpool-redis

Use deadpool-redis for pooling (fixes #707)
This commit is contained in:
Tima Kinsart 2024-07-21 04:44:26 +00:00 committed by GitHub
commit 2c894b05e4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 59 additions and 38 deletions

View file

@ -46,7 +46,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `filter_web_app_data` - `filter_web_app_data`
- Implement `PostgresStorage`, a persistent dialogue storage based on [PostgreSQL](https://www.postgresql.org/)([PR 996](https://github.com/teloxide/teloxide/pull/996)). - Implement `PostgresStorage`, a persistent dialogue storage based on [PostgreSQL](https://www.postgresql.org/)([PR 996](https://github.com/teloxide/teloxide/pull/996)).
- Implement `GetChatId` for `teloxide_core::types::{Chat, ChatJoinRequest, ChatMemberUpdated}`. - Implement `GetChatId` for `teloxide_core::types::{Chat, ChatJoinRequest, ChatMemberUpdated}`.
- Add `MessageExt::filter_story` method for the corresponding `MediaKind::Story` variant ([PR 1087](https://github.com/teloxide/teloxide/pull/1087)) - Use [deadpool-redis](https://crates.io/crates/deadpool-redis) for Redis connection pooling ([PR 1081](https://github.com/teloxide/teloxide/pull/1081)).
- Add `MessageExt::filter_story` method for the corresponding `MediaKind::Story` variant ([PR 1087](https://github.com/teloxide/teloxide/pull/1087)).
### Fixed ### Fixed

55
Cargo.lock generated
View file

@ -405,6 +405,37 @@ dependencies = [
"syn 1.0.109", "syn 1.0.109",
] ]
[[package]]
name = "deadpool"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490"
dependencies = [
"async-trait",
"deadpool-runtime",
"num_cpus",
"tokio",
]
[[package]]
name = "deadpool-redis"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36f2381b0e993d06a1f6d49f486b33bc4004085bf980340fc05726bacc681fff"
dependencies = [
"deadpool",
"redis",
]
[[package]]
name = "deadpool-runtime"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63dfa964fe2a66f3fde91fc70b267fe193d822c7e603e2a675a49a7f46ad3f49"
dependencies = [
"tokio",
]
[[package]] [[package]]
name = "derive_more" name = "derive_more"
version = "0.99.18" version = "0.99.18"
@ -912,7 +943,7 @@ dependencies = [
"httpdate", "httpdate",
"itoa", "itoa",
"pin-project-lite", "pin-project-lite",
"socket2 0.5.6", "socket2",
"tokio", "tokio",
"tower-service", "tower-service",
"tracing", "tracing",
@ -1514,8 +1545,6 @@ dependencies = [
"percent-encoding", "percent-encoding",
"pin-project-lite", "pin-project-lite",
"ryu", "ryu",
"sha1_smol",
"socket2 0.4.10",
"tokio", "tokio",
"tokio-util", "tokio-util",
"url", "url",
@ -1864,12 +1893,6 @@ dependencies = [
"syn 1.0.109", "syn 1.0.109",
] ]
[[package]]
name = "sha1_smol"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012"
[[package]] [[package]]
name = "sha2" name = "sha2"
version = "0.10.8" version = "0.10.8"
@ -1905,16 +1928,6 @@ version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]]
name = "socket2"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d"
dependencies = [
"libc",
"winapi",
]
[[package]] [[package]]
name = "socket2" name = "socket2"
version = "0.5.6" version = "0.5.6"
@ -2198,6 +2211,7 @@ dependencies = [
"bincode", "bincode",
"bytes", "bytes",
"chrono", "chrono",
"deadpool-redis",
"derive_more", "derive_more",
"dptree", "dptree",
"either", "either",
@ -2207,7 +2221,6 @@ dependencies = [
"pin-project", "pin-project",
"pretty_env_logger 0.5.0", "pretty_env_logger 0.5.0",
"rand", "rand",
"redis",
"reqwest", "reqwest",
"serde", "serde",
"serde_cbor", "serde_cbor",
@ -2339,7 +2352,7 @@ dependencies = [
"num_cpus", "num_cpus",
"pin-project-lite", "pin-project-lite",
"signal-hook-registry", "signal-hook-registry",
"socket2 0.5.6", "socket2",
"tokio-macros", "tokio-macros",
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]

View file

@ -31,7 +31,7 @@ sqlite-storage-nativetls = [
sqlite-storage-rustls = ["sqlx", "sqlx/runtime-tokio-rustls", "rustls"] sqlite-storage-rustls = ["sqlx", "sqlx/runtime-tokio-rustls", "rustls"]
postgres-storage-nativetls = ["sqlx", "sqlx/runtime-tokio-native-tls", "native-tls"] postgres-storage-nativetls = ["sqlx", "sqlx/runtime-tokio-native-tls", "native-tls"]
postgres-storage-rustls = ["sqlx", "sqlx/runtime-tokio-rustls", "rustls"] postgres-storage-rustls = ["sqlx", "sqlx/runtime-tokio-rustls", "rustls"]
redis-storage = ["redis"] redis-storage = ["deadpool-redis"]
cbor-serializer = ["serde_cbor"] cbor-serializer = ["serde_cbor"]
bincode-serializer = ["bincode"] bincode-serializer = ["bincode"]
@ -109,7 +109,7 @@ sqlx = { version = "0.7.3", optional = true, default-features = false, features
"sqlite", "sqlite",
"postgres" "postgres"
] } ] }
redis = { version = "0.24", features = ["tokio-comp"], optional = true } deadpool-redis = { version = "0.14", features = ["rt_tokio_1"], optional = true }
serde_cbor = { version = "0.11", optional = true } serde_cbor = { version = "0.11", optional = true }
bincode = { version = "1.3", optional = true } bincode = { version = "1.3", optional = true }
axum = { version = "0.7.0", optional = true } axum = { version = "0.7.0", optional = true }

View file

@ -1,16 +1,15 @@
use super::{serializer::Serializer, Storage}; use super::{serializer::Serializer, Storage};
use deadpool_redis::{redis, CreatePoolError, PoolError, Runtime};
use futures::future::BoxFuture; use futures::future::BoxFuture;
use redis::{AsyncCommands, IntoConnectionInfo}; use redis::AsyncCommands;
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use std::{ use std::{
convert::Infallible, convert::Infallible,
fmt::{Debug, Display}, fmt::{Debug, Display},
ops::DerefMut,
sync::Arc, sync::Arc,
}; };
use teloxide_core::types::ChatId; use teloxide_core::types::ChatId;
use thiserror::Error; use thiserror::Error;
use tokio::sync::Mutex;
/// An error returned from [`RedisStorage`]. /// An error returned from [`RedisStorage`].
#[derive(Debug, Error)] #[derive(Debug, Error)]
@ -24,6 +23,12 @@ where
#[error("error from Redis: {0}")] #[error("error from Redis: {0}")]
RedisError(#[from] redis::RedisError), RedisError(#[from] redis::RedisError),
#[error("error creating redis pool: {0}")]
CreatePoolError(#[from] CreatePoolError),
#[error("redis pool error: {0}")]
PoolError(#[from] PoolError),
/// Returned from [`RedisStorage::remove_dialogue`]. /// Returned from [`RedisStorage::remove_dialogue`].
#[error("row not found")] #[error("row not found")]
DialogueNotFound, DialogueNotFound,
@ -31,19 +36,19 @@ where
/// A dialogue storage based on [Redis](https://redis.io/). /// A dialogue storage based on [Redis](https://redis.io/).
pub struct RedisStorage<S> { pub struct RedisStorage<S> {
conn: Mutex<redis::aio::Connection>, pool: deadpool_redis::Pool,
serializer: S, serializer: S,
} }
impl<S> RedisStorage<S> { impl<S> RedisStorage<S> {
pub async fn open( pub async fn open(
url: impl IntoConnectionInfo, url: &str,
serializer: S, serializer: S,
) -> Result<Arc<Self>, RedisStorageError<Infallible>> { ) -> Result<Arc<Self>, RedisStorageError<Infallible>> {
Ok(Arc::new(Self { let config = deadpool_redis::Config::from_url(url);
conn: Mutex::new(redis::Client::open(url)?.get_async_connection().await?), let pool = config.create_pool(Some(Runtime::Tokio1))?;
serializer,
})) Ok(Arc::new(Self { pool, serializer }))
} }
} }
@ -60,10 +65,12 @@ where
ChatId(chat_id): ChatId, ChatId(chat_id): ChatId,
) -> BoxFuture<'static, Result<(), Self::Error>> { ) -> BoxFuture<'static, Result<(), Self::Error>> {
Box::pin(async move { Box::pin(async move {
let mut conn = self.pool.get().await?;
let deleted_rows_count = redis::pipe() let deleted_rows_count = redis::pipe()
.atomic() .atomic()
.del(chat_id) .del(chat_id)
.query_async::<_, redis::Value>(self.conn.lock().await.deref_mut()) .query_async::<_, redis::Value>(&mut conn)
.await?; .await?;
if let redis::Value::Bulk(values) = deleted_rows_count { if let redis::Value::Bulk(values) = deleted_rows_count {
@ -89,7 +96,7 @@ where
Box::pin(async move { Box::pin(async move {
let dialogue = let dialogue =
self.serializer.serialize(&dialogue).map_err(RedisStorageError::SerdeError)?; self.serializer.serialize(&dialogue).map_err(RedisStorageError::SerdeError)?;
() = self.conn.lock().await.set::<_, Vec<u8>, _>(chat_id, dialogue).await?; () = self.pool.get().await?.set::<_, Vec<u8>, _>(chat_id, dialogue).await?;
Ok(()) Ok(())
}) })
} }
@ -99,9 +106,9 @@ where
ChatId(chat_id): ChatId, ChatId(chat_id): ChatId,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> { ) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
Box::pin(async move { Box::pin(async move {
self.conn self.pool
.lock() .get()
.await .await?
.get::<_, Option<Vec<u8>>>(chat_id) .get::<_, Option<Vec<u8>>>(chat_id)
.await? .await?
.map(|d| self.serializer.deserialize(&d).map_err(RedisStorageError::SerdeError)) .map(|d| self.serializer.deserialize(&d).map_err(RedisStorageError::SerdeError))