mirror of
https://github.com/teloxide/teloxide.git
synced 2025-03-14 11:44:04 +01:00
Add PostgresStorage
This commit is contained in:
parent
2945f4d301
commit
25fb0eed11
8 changed files with 178 additions and 0 deletions
|
@ -25,7 +25,10 @@ webhooks-axum = ["webhooks", "axum", "tower", "tower-http"]
|
|||
|
||||
sqlite-storage-nativetls = ["sqlx", "sqlx/runtime-tokio-native-tls", "native-tls"]
|
||||
sqlite-storage-rustls = ["sqlx", "sqlx/runtime-tokio-rustls", "rustls"]
|
||||
postgres-storage-nativetls = ["sqlx", "sqlx/runtime-tokio-native-tls", "native-tls"]
|
||||
postgres-storage-rustls = ["sqlx", "sqlx/runtime-tokio-rustls", "rustls"]
|
||||
redis-storage = ["redis"]
|
||||
|
||||
cbor-serializer = ["serde_cbor"]
|
||||
bincode-serializer = ["bincode"]
|
||||
|
||||
|
@ -101,6 +104,7 @@ either = "1.9.0"
|
|||
sqlx = { version = "0.7.3", optional = true, default-features = false, features = [
|
||||
"macros",
|
||||
"sqlite",
|
||||
"postgres"
|
||||
] }
|
||||
redis = { version = "0.21", features = ["tokio-comp"], optional = true }
|
||||
serde_cbor = { version = "0.11", optional = true }
|
||||
|
|
|
@ -99,6 +99,9 @@ pub use self::{RedisStorage, RedisStorageError};
|
|||
#[cfg(feature = "sqlite-storage-nativetls")]
|
||||
pub use self::{SqliteStorage, SqliteStorageError};
|
||||
|
||||
#[cfg(feature = "postgres-storage-nativetls")]
|
||||
pub use self::{PostgresStorage, PostgresStorageError};
|
||||
|
||||
pub use get_chat_id::GetChatId;
|
||||
pub use storage::*;
|
||||
|
||||
|
|
|
@ -9,6 +9,9 @@ mod redis_storage;
|
|||
#[cfg(feature = "sqlite-storage-nativetls")]
|
||||
mod sqlite_storage;
|
||||
|
||||
#[cfg(feature = "postgres-storage-nativetls")]
|
||||
mod postgres_storage;
|
||||
|
||||
use futures::future::BoxFuture;
|
||||
use teloxide_core::types::ChatId;
|
||||
|
||||
|
@ -25,6 +28,9 @@ use std::sync::Arc;
|
|||
#[cfg(feature = "sqlite-storage-nativetls")]
|
||||
pub use sqlite_storage::{SqliteStorage, SqliteStorageError};
|
||||
|
||||
#[cfg(feature = "postgres-storage-nativetls")]
|
||||
pub use postgres_storage::{PostgresStorage, PostgresStorageError};
|
||||
|
||||
/// A storage with an erased error type.
|
||||
pub type ErasedStorage<D> =
|
||||
dyn Storage<D, Error = Box<dyn std::error::Error + Send + Sync>> + Send + Sync;
|
||||
|
|
|
@ -0,0 +1,157 @@
|
|||
use std::{
|
||||
convert::Infallible,
|
||||
fmt::{Debug, Display},
|
||||
str,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use futures::future::BoxFuture;
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use sqlx::postgres::{PgPool, PgPoolOptions};
|
||||
use teloxide_core::types::ChatId;
|
||||
use thiserror::Error;
|
||||
|
||||
use super::{serializer::Serializer, Storage};
|
||||
|
||||
/// An error returned from [`PostgresStorage`].
|
||||
#[derive(Debug, Error)]
|
||||
pub enum PostgresStorageError<SE>
|
||||
where
|
||||
SE: Debug + Display,
|
||||
{
|
||||
#[error("dialogue serialization error: {0}")]
|
||||
SerdeError(SE),
|
||||
|
||||
#[error("postgres error: {0}")]
|
||||
PostgresError(#[from] sqlx::Error),
|
||||
|
||||
// TODO maybe add chat_id for the sake of completeness?
|
||||
#[error("row not found")]
|
||||
DialogueNotFound,
|
||||
}
|
||||
|
||||
/// A persistent dialogue storage based on [PostgreSQL](https://www.postgresql.org/)
|
||||
pub struct PostgresStorage<S> {
|
||||
pool: PgPool,
|
||||
serializer: S,
|
||||
}
|
||||
|
||||
impl<S> PostgresStorage<S> {
|
||||
/// Opens a connection pool to the [Postgres](https://www.postgresql.org/) database and creates the table
|
||||
/// for storing dialogues.
|
||||
///
|
||||
/// Parameters:
|
||||
/// - database_url: full url to the postgres database, for example
|
||||
/// `"postgres://postgres:password@localhost/test")`
|
||||
/// - max_connections: number of connections in creating connection pool. Be
|
||||
/// mindful of the connection limits for your database, each
|
||||
/// connection established with the Postgres creates a new process on the
|
||||
/// server side
|
||||
/// - serializer: what [`Serializer`] will be used to encode the dialogue
|
||||
/// data. Available ones are: [`Json`], [`Bincode`], [`Cbor`]
|
||||
///
|
||||
/// [`Json`]: crate::dispatching::dialogue::serializer::Json
|
||||
/// [`Bincode`]: crate::dispatching::dialogue::serializer::Bincode
|
||||
/// [`Cbor`]: crate::dispatching::dialogue::serializer::Cbor
|
||||
pub async fn open(
|
||||
database_url: &str,
|
||||
max_connections: u32,
|
||||
serializer: S,
|
||||
) -> Result<Arc<Self>, PostgresStorageError<Infallible>> {
|
||||
let pool =
|
||||
PgPoolOptions::new().max_connections(max_connections).connect(database_url).await?;
|
||||
sqlx::query(include_str!("postgres_storage/queries/create_teloxide_dialogues.sql"))
|
||||
.execute(&pool)
|
||||
.await?;
|
||||
|
||||
Ok(Arc::new(Self { pool, serializer }))
|
||||
}
|
||||
|
||||
async fn get_dialogue(
|
||||
self: Arc<Self>,
|
||||
ChatId(chat_id): ChatId,
|
||||
) -> Result<Option<Vec<u8>>, sqlx::Error> {
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct DialogueDbRow {
|
||||
dialogue: Vec<u8>,
|
||||
}
|
||||
|
||||
let bytes = sqlx::query_as::<_, DialogueDbRow>(include_str!(
|
||||
"postgres_storage/queries/get_dialogue.sql"
|
||||
))
|
||||
.bind(chat_id)
|
||||
.fetch_optional(&self.pool)
|
||||
.await?
|
||||
.map(|r| r.dialogue);
|
||||
|
||||
Ok(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: these methods' bodies are almostly the same as SqliteStorage ones
|
||||
// (except actual queries) Maybe combine them somehow?
|
||||
|
||||
impl<S, D> Storage<D> for PostgresStorage<S>
|
||||
where
|
||||
S: Send + Sync + Serializer<D> + 'static,
|
||||
D: Send + Serialize + DeserializeOwned + 'static,
|
||||
<S as Serializer<D>>::Error: Debug + Display,
|
||||
{
|
||||
type Error = PostgresStorageError<<S as Serializer<D>>::Error>;
|
||||
|
||||
fn remove_dialogue(
|
||||
self: Arc<Self>,
|
||||
ChatId(chat_id): ChatId,
|
||||
) -> BoxFuture<'static, Result<(), Self::Error>>
|
||||
where
|
||||
D: Send + 'static,
|
||||
{
|
||||
Box::pin(async move {
|
||||
let deleted_rows_count =
|
||||
sqlx::query(include_str!("postgres_storage/queries/remove_dialogue.sql"))
|
||||
.bind(chat_id)
|
||||
.execute(&self.pool)
|
||||
.await?
|
||||
.rows_affected();
|
||||
|
||||
if deleted_rows_count == 0 {
|
||||
return Err(PostgresStorageError::DialogueNotFound);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn update_dialogue(
|
||||
self: Arc<Self>,
|
||||
ChatId(chat_id): ChatId,
|
||||
dialogue: D,
|
||||
) -> BoxFuture<'static, Result<(), Self::Error>>
|
||||
where
|
||||
D: Send + 'static,
|
||||
{
|
||||
Box::pin(async move {
|
||||
let d =
|
||||
self.serializer.serialize(&dialogue).map_err(PostgresStorageError::SerdeError)?;
|
||||
sqlx::query(include_str!("postgres_storage/queries/update_dialogue.sql"))
|
||||
.bind(chat_id)
|
||||
.bind(d)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn get_dialogue(
|
||||
self: Arc<Self>,
|
||||
chat_id: ChatId,
|
||||
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
|
||||
Box::pin(async move {
|
||||
self.clone()
|
||||
.get_dialogue(chat_id)
|
||||
.await?
|
||||
.map(|d| self.serializer.deserialize(&d).map_err(PostgresStorageError::SerdeError))
|
||||
.transpose()
|
||||
})
|
||||
}
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
CREATE TABLE IF NOT EXISTS teloxide_dialogues (
|
||||
chat_id BIGINT PRIMARY KEY,
|
||||
dialogue BYTEA NOT NULL
|
||||
)
|
|
@ -0,0 +1 @@
|
|||
SELECT dialogue FROM teloxide_dialogues WHERE chat_id = $1
|
|
@ -0,0 +1 @@
|
|||
DELETE FROM teloxide_dialogues WHERE chat_id = $1
|
|
@ -0,0 +1,2 @@
|
|||
INSERT INTO teloxide_dialogues VALUES ($1, $2)
|
||||
ON CONFLICT(chat_id) DO UPDATE SET dialogue=excluded.dialogue
|
Loading…
Add table
Reference in a new issue