diff --git a/crates/teloxide/Cargo.toml b/crates/teloxide/Cargo.toml index 8286a3a7..f19ea988 100644 --- a/crates/teloxide/Cargo.toml +++ b/crates/teloxide/Cargo.toml @@ -25,7 +25,10 @@ webhooks-axum = ["webhooks", "axum", "tower", "tower-http"] sqlite-storage-nativetls = ["sqlx", "sqlx/runtime-tokio-native-tls", "native-tls"] sqlite-storage-rustls = ["sqlx", "sqlx/runtime-tokio-rustls", "rustls"] +postgres-storage-nativetls = ["sqlx", "sqlx/runtime-tokio-native-tls", "native-tls"] +postgres-storage-rustls = ["sqlx", "sqlx/runtime-tokio-rustls", "rustls"] redis-storage = ["redis"] + cbor-serializer = ["serde_cbor"] bincode-serializer = ["bincode"] @@ -101,6 +104,7 @@ either = "1.9.0" sqlx = { version = "0.7.3", optional = true, default-features = false, features = [ "macros", "sqlite", + "postgres" ] } redis = { version = "0.21", features = ["tokio-comp"], optional = true } serde_cbor = { version = "0.11", optional = true } diff --git a/crates/teloxide/src/dispatching/dialogue.rs b/crates/teloxide/src/dispatching/dialogue.rs index 6f668e8f..e7938654 100644 --- a/crates/teloxide/src/dispatching/dialogue.rs +++ b/crates/teloxide/src/dispatching/dialogue.rs @@ -99,6 +99,9 @@ pub use self::{RedisStorage, RedisStorageError}; #[cfg(feature = "sqlite-storage-nativetls")] pub use self::{SqliteStorage, SqliteStorageError}; +#[cfg(feature = "postgres-storage-nativetls")] +pub use self::{PostgresStorage, PostgresStorageError}; + pub use get_chat_id::GetChatId; pub use storage::*; diff --git a/crates/teloxide/src/dispatching/dialogue/storage.rs b/crates/teloxide/src/dispatching/dialogue/storage.rs index 73611c61..d071d5ed 100644 --- a/crates/teloxide/src/dispatching/dialogue/storage.rs +++ b/crates/teloxide/src/dispatching/dialogue/storage.rs @@ -9,6 +9,9 @@ mod redis_storage; #[cfg(feature = "sqlite-storage-nativetls")] mod sqlite_storage; +#[cfg(feature = "postgres-storage-nativetls")] +mod postgres_storage; + use futures::future::BoxFuture; use teloxide_core::types::ChatId; @@ -25,6 +28,9 @@ use std::sync::Arc; #[cfg(feature = "sqlite-storage-nativetls")] pub use sqlite_storage::{SqliteStorage, SqliteStorageError}; +#[cfg(feature = "postgres-storage-nativetls")] +pub use postgres_storage::{PostgresStorage, PostgresStorageError}; + /// A storage with an erased error type. pub type ErasedStorage<D> = dyn Storage<D, Error = Box<dyn std::error::Error + Send + Sync>> + Send + Sync; diff --git a/crates/teloxide/src/dispatching/dialogue/storage/postgres_storage.rs b/crates/teloxide/src/dispatching/dialogue/storage/postgres_storage.rs new file mode 100644 index 00000000..ceb4f2a8 --- /dev/null +++ b/crates/teloxide/src/dispatching/dialogue/storage/postgres_storage.rs @@ -0,0 +1,157 @@ +use std::{ + convert::Infallible, + fmt::{Debug, Display}, + str, + sync::Arc, +}; + +use futures::future::BoxFuture; +use serde::{de::DeserializeOwned, Serialize}; +use sqlx::postgres::{PgPool, PgPoolOptions}; +use teloxide_core::types::ChatId; +use thiserror::Error; + +use super::{serializer::Serializer, Storage}; + +/// An error returned from [`PostgresStorage`]. +#[derive(Debug, Error)] +pub enum PostgresStorageError<SE> +where + SE: Debug + Display, +{ + #[error("dialogue serialization error: {0}")] + SerdeError(SE), + + #[error("postgres error: {0}")] + PostgresError(#[from] sqlx::Error), + + // TODO maybe add chat_id for the sake of completeness? + #[error("row not found")] + DialogueNotFound, +} + +/// A persistent dialogue storage based on [PostgreSQL](https://www.postgresql.org/) +pub struct PostgresStorage<S> { + pool: PgPool, + serializer: S, +} + +impl<S> PostgresStorage<S> { + /// Opens a connection pool to the [Postgres](https://www.postgresql.org/) database and creates the table + /// for storing dialogues. + /// + /// Parameters: + /// - database_url: full url to the postgres database, for example + /// `"postgres://postgres:password@localhost/test")` + /// - max_connections: number of connections in creating connection pool. Be + /// mindful of the connection limits for your database, each + /// connection established with the Postgres creates a new process on the + /// server side + /// - serializer: what [`Serializer`] will be used to encode the dialogue + /// data. Available ones are: [`Json`], [`Bincode`], [`Cbor`] + /// + /// [`Json`]: crate::dispatching::dialogue::serializer::Json + /// [`Bincode`]: crate::dispatching::dialogue::serializer::Bincode + /// [`Cbor`]: crate::dispatching::dialogue::serializer::Cbor + pub async fn open( + database_url: &str, + max_connections: u32, + serializer: S, + ) -> Result<Arc<Self>, PostgresStorageError<Infallible>> { + let pool = + PgPoolOptions::new().max_connections(max_connections).connect(database_url).await?; + sqlx::query(include_str!("postgres_storage/queries/create_teloxide_dialogues.sql")) + .execute(&pool) + .await?; + + Ok(Arc::new(Self { pool, serializer })) + } + + async fn get_dialogue( + self: Arc<Self>, + ChatId(chat_id): ChatId, + ) -> Result<Option<Vec<u8>>, sqlx::Error> { + #[derive(sqlx::FromRow)] + struct DialogueDbRow { + dialogue: Vec<u8>, + } + + let bytes = sqlx::query_as::<_, DialogueDbRow>(include_str!( + "postgres_storage/queries/get_dialogue.sql" + )) + .bind(chat_id) + .fetch_optional(&self.pool) + .await? + .map(|r| r.dialogue); + + Ok(bytes) + } +} + +// FIXME: these methods' bodies are almostly the same as SqliteStorage ones +// (except actual queries) Maybe combine them somehow? + +impl<S, D> Storage<D> for PostgresStorage<S> +where + S: Send + Sync + Serializer<D> + 'static, + D: Send + Serialize + DeserializeOwned + 'static, + <S as Serializer<D>>::Error: Debug + Display, +{ + type Error = PostgresStorageError<<S as Serializer<D>>::Error>; + + fn remove_dialogue( + self: Arc<Self>, + ChatId(chat_id): ChatId, + ) -> BoxFuture<'static, Result<(), Self::Error>> + where + D: Send + 'static, + { + Box::pin(async move { + let deleted_rows_count = + sqlx::query(include_str!("postgres_storage/queries/remove_dialogue.sql")) + .bind(chat_id) + .execute(&self.pool) + .await? + .rows_affected(); + + if deleted_rows_count == 0 { + return Err(PostgresStorageError::DialogueNotFound); + } + + Ok(()) + }) + } + + fn update_dialogue( + self: Arc<Self>, + ChatId(chat_id): ChatId, + dialogue: D, + ) -> BoxFuture<'static, Result<(), Self::Error>> + where + D: Send + 'static, + { + Box::pin(async move { + let d = + self.serializer.serialize(&dialogue).map_err(PostgresStorageError::SerdeError)?; + sqlx::query(include_str!("postgres_storage/queries/update_dialogue.sql")) + .bind(chat_id) + .bind(d) + .execute(&self.pool) + .await?; + Ok(()) + }) + } + + fn get_dialogue( + self: Arc<Self>, + chat_id: ChatId, + ) -> BoxFuture<'static, Result<Option<D>, Self::Error>> { + Box::pin(async move { + self.clone() + .get_dialogue(chat_id) + .await? + .map(|d| self.serializer.deserialize(&d).map_err(PostgresStorageError::SerdeError)) + .transpose() + }) + } +} diff --git a/crates/teloxide/src/dispatching/dialogue/storage/postgres_storage/queries/create_teloxide_dialogues.sql b/crates/teloxide/src/dispatching/dialogue/storage/postgres_storage/queries/create_teloxide_dialogues.sql new file mode 100644 index 00000000..3fc00d84 --- /dev/null +++ b/crates/teloxide/src/dispatching/dialogue/storage/postgres_storage/queries/create_teloxide_dialogues.sql @@ -0,0 +1,4 @@ +CREATE TABLE IF NOT EXISTS teloxide_dialogues ( + chat_id BIGINT PRIMARY KEY, + dialogue BYTEA NOT NULL +) \ No newline at end of file diff --git a/crates/teloxide/src/dispatching/dialogue/storage/postgres_storage/queries/get_dialogue.sql b/crates/teloxide/src/dispatching/dialogue/storage/postgres_storage/queries/get_dialogue.sql new file mode 100644 index 00000000..e5e3c834 --- /dev/null +++ b/crates/teloxide/src/dispatching/dialogue/storage/postgres_storage/queries/get_dialogue.sql @@ -0,0 +1 @@ +SELECT dialogue FROM teloxide_dialogues WHERE chat_id = $1 \ No newline at end of file diff --git a/crates/teloxide/src/dispatching/dialogue/storage/postgres_storage/queries/remove_dialogue.sql b/crates/teloxide/src/dispatching/dialogue/storage/postgres_storage/queries/remove_dialogue.sql new file mode 100644 index 00000000..2fb0b93f --- /dev/null +++ b/crates/teloxide/src/dispatching/dialogue/storage/postgres_storage/queries/remove_dialogue.sql @@ -0,0 +1 @@ +DELETE FROM teloxide_dialogues WHERE chat_id = $1 \ No newline at end of file diff --git a/crates/teloxide/src/dispatching/dialogue/storage/postgres_storage/queries/update_dialogue.sql b/crates/teloxide/src/dispatching/dialogue/storage/postgres_storage/queries/update_dialogue.sql new file mode 100644 index 00000000..d21f4086 --- /dev/null +++ b/crates/teloxide/src/dispatching/dialogue/storage/postgres_storage/queries/update_dialogue.sql @@ -0,0 +1,2 @@ +INSERT INTO teloxide_dialogues VALUES ($1, $2) +ON CONFLICT(chat_id) DO UPDATE SET dialogue=excluded.dialogue \ No newline at end of file