Merge pull request #996 from syrtcevvi/postgres_storage

Postgres storage
This commit is contained in:
Tima Kinsart 2024-04-08 11:36:33 +00:00 committed by GitHub
commit 9f373fbd00
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 310 additions and 7 deletions

View file

@ -8,7 +8,7 @@ on:
name: Continuous integration name: Continuous integration
env: env:
RUSTFLAGS: "--cfg CI_REDIS -Dwarnings" RUSTFLAGS: "--cfg CI_REDIS --cfg CI_POSTGRES -Dwarnings"
RUSTDOCFLAGS: "--cfg docsrs -Dwarnings" RUSTDOCFLAGS: "--cfg docsrs -Dwarnings"
RUST_BACKTRACE: short RUST_BACKTRACE: short
@ -27,7 +27,7 @@ env:
# - down below in a matrix # - down below in a matrix
# - `Cargo.toml` # - `Cargo.toml`
# - **/CHANGELOG.md # - **/CHANGELOG.md
rust_msrv: 1.68.0 rust_msrv: 1.70.0
CI: 1 CI: 1
@ -75,6 +75,20 @@ jobs:
test: test:
name: Test name: Test
runs-on: ubuntu-latest runs-on: ubuntu-latest
services:
# Setup Postgres for testing PostgresStorage
postgres:
image: postgres
env:
POSTGRES_USER: teloxide
POSTGRES_PASSWORD: rewrite_it_in_rust
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
strategy: strategy:
matrix: matrix:
rust: rust:
@ -94,7 +108,7 @@ jobs:
toolchain: nightly-2024-03-20 toolchain: nightly-2024-03-20
features: "--features full nightly" features: "--features full nightly"
- rust: msrv - rust: msrv
toolchain: 1.68.0 toolchain: 1.70.0
features: "--features full" features: "--features full"
steps: steps:
@ -140,7 +154,16 @@ jobs:
redis-server --port 7777 > /dev/null & redis-server --port 7777 > /dev/null &
redis-server --port 7778 > /dev/null & redis-server --port 7778 > /dev/null &
redis-server --port 7779 > /dev/null & redis-server --port 7779 > /dev/null &
- name: Install psql
run: |
sudo apt install postgresql-client -y
- name: Create PostgreSQL databases
run: |
psql -h localhost -U teloxide -c "CREATE DATABASE test_postgres_json;"
psql -h localhost -U teloxide -c "CREATE DATABASE test_postgres_bincode;"
psql -h localhost -U teloxide -c "CREATE DATABASE test_postgres_cbor;"
env:
PGPASSWORD: rewrite_it_in_rust
- name: Test unit & integration tests - name: Test unit & integration tests
run: | run: |
cargo +${{ matrix.toolchain }} test --tests --verbose ${{ matrix.features }} cargo +${{ matrix.toolchain }} test --tests --verbose ${{ matrix.features }}

View file

@ -44,6 +44,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `filter_video_chat_ended` - `filter_video_chat_ended`
- `filter_video_chat_participants_invited` - `filter_video_chat_participants_invited`
- `filter_web_app_data` - `filter_web_app_data`
- Implement `PostgresStorage`, a persistent dialogue storage based on [PostgreSQL](https://www.postgresql.org/)([PR 996](https://github.com/teloxide/teloxide/pull/996)).
- Implement `GetChatId` for `teloxide_core::types::{Chat, ChatJoinRequest, ChatMemberUpdated}`. - Implement `GetChatId` for `teloxide_core::types::{Chat, ChatJoinRequest, ChatMemberUpdated}`.
### Fixed ### Fixed
@ -60,6 +61,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- MSRV (Minimal Supported Rust Version) was bumped from `1.64.0` to `1.68.0` ([PR 950][https://github.com/teloxide/teloxide/pull/950]) - MSRV (Minimal Supported Rust Version) was bumped from `1.64.0` to `1.68.0` ([PR 950][https://github.com/teloxide/teloxide/pull/950])
- Sqlx version was bumped from `0.6` to `0.7.3`([PR 995](https://github.com/teloxide/teloxide/pull/995)) - Sqlx version was bumped from `0.6` to `0.7.3`([PR 995](https://github.com/teloxide/teloxide/pull/995))
- Feature `sqlite-storage` was renamed to `sqlite-storage-nativetls`([PR 995](https://github.com/teloxide/teloxide/pull/995)) - Feature `sqlite-storage` was renamed to `sqlite-storage-nativetls`([PR 995](https://github.com/teloxide/teloxide/pull/995))
- MSRV (Minimal Supported Rust Version) was bumped from `1.68.0` to `1.70.0` ([PR 996][https://github.com/teloxide/teloxide/pull/996])
### Removed ### Removed

View file

@ -5,7 +5,7 @@ resolver = "2"
# The settings below will be applied to all crates in the workspace # The settings below will be applied to all crates in the workspace
[workspace.package] [workspace.package]
# MSRV (minimal supported Rust version). # MSRV (minimal supported Rust version).
rust-version = "1.68" rust-version = "1.70"
edition = "2021" edition = "2021"
license = "MIT" license = "MIT"

View file

@ -58,7 +58,7 @@ $ set TELOXIDE_TOKEN=<Your token here>
$ $env:TELOXIDE_TOKEN=<Your token here> $ $env:TELOXIDE_TOKEN=<Your token here>
``` ```
4. Make sure that your Rust compiler is up to date (`teloxide` currently requires rustc at least version 1.68): 4. Make sure that your Rust compiler is up to date (`teloxide` currently requires rustc at least version 1.70):
```bash ```bash
# If you're using stable # If you're using stable
$ rustup update stable $ rustup update stable

View file

@ -106,6 +106,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `DefaultParseMode` now also requires that the supported requests implement `Clone` (as a user you should not notice anything changing) - `DefaultParseMode` now also requires that the supported requests implement `Clone` (as a user you should not notice anything changing)
- Methods of the Message type: `delete_chat_photo`, `group_chat_created`, `super_group_chat_created`, `channel_chat_created`, `chat_migration`, `migrate_to_chat_id`, `migrate_from_chat_id` now return shared reference instead of owned value inside `Option` ([#982][pr982]) - Methods of the Message type: `delete_chat_photo`, `group_chat_created`, `super_group_chat_created`, `channel_chat_created`, `chat_migration`, `migrate_to_chat_id`, `migrate_from_chat_id` now return shared reference instead of owned value inside `Option` ([#982][pr982])
- Methods `delete_chat_photo`, `group_chat_created`, `super_group_chat_created`, `channel_chat_created` now return appropriate structs not `Option<True>` ([#982][pr982]) - Methods `delete_chat_photo`, `group_chat_created`, `super_group_chat_created`, `channel_chat_created` now return appropriate structs not `Option<True>` ([#982][pr982])
- MSRV (Minimal Supported Rust Version) was bumped from `1.68.0` to `1.70.0` ([#996][pr996])
[pr852]: https://github.com/teloxide/teloxide/pull/853 [pr852]: https://github.com/teloxide/teloxide/pull/853
[pr859]: https://github.com/teloxide/teloxide/pull/859 [pr859]: https://github.com/teloxide/teloxide/pull/859
@ -114,6 +115,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[pr892]: https://github.com/teloxide/teloxide/pull/892 [pr892]: https://github.com/teloxide/teloxide/pull/892
[pr950]: https://github.com/teloxide/teloxide/pull/950 [pr950]: https://github.com/teloxide/teloxide/pull/950
[pr961]: https://github.com/teloxide/teloxide/pull/961 [pr961]: https://github.com/teloxide/teloxide/pull/961
[pr996]: https://github.com/teloxide/teloxide/pull/996
### Deprecated ### Deprecated

View file

@ -7,7 +7,7 @@
//!```toml //!```toml
//! teloxide-core = "0.9" //! teloxide-core = "0.9"
//! ``` //! ```
//! _Compiler support: requires rustc 1.68+_. //! _Compiler support: requires rustc 1.70+_.
//! //!
//! ``` //! ```
//! # async { //! # async {

View file

@ -29,7 +29,10 @@ sqlite-storage-nativetls = [
"native-tls", "native-tls",
] ]
sqlite-storage-rustls = ["sqlx", "sqlx/runtime-tokio-rustls", "rustls"] sqlite-storage-rustls = ["sqlx", "sqlx/runtime-tokio-rustls", "rustls"]
postgres-storage-nativetls = ["sqlx", "sqlx/runtime-tokio-native-tls", "native-tls"]
postgres-storage-rustls = ["sqlx", "sqlx/runtime-tokio-rustls", "rustls"]
redis-storage = ["redis"] redis-storage = ["redis"]
cbor-serializer = ["serde_cbor"] cbor-serializer = ["serde_cbor"]
bincode-serializer = ["bincode"] bincode-serializer = ["bincode"]
@ -57,6 +60,7 @@ full = [
# "sqlite-storage-rustls" is explicitly ommited here, # "sqlite-storage-rustls" is explicitly ommited here,
# since it conflicts with "sqlite-storage-nativetls" # since it conflicts with "sqlite-storage-nativetls"
"redis-storage", "redis-storage",
"postgres-storage-nativetls",
"cbor-serializer", "cbor-serializer",
"bincode-serializer", "bincode-serializer",
"macros", "macros",
@ -102,6 +106,7 @@ either = "1.9.0"
sqlx = { version = "0.7.3", optional = true, default-features = false, features = [ sqlx = { version = "0.7.3", optional = true, default-features = false, features = [
"macros", "macros",
"sqlite", "sqlite",
"postgres"
] } ] }
redis = { version = "0.24", features = ["tokio-comp"], optional = true } redis = { version = "0.24", features = ["tokio-comp"], optional = true }
serde_cbor = { version = "0.11", optional = true } serde_cbor = { version = "0.11", optional = true }
@ -155,6 +160,10 @@ required-features = [
"bincode-serializer", "bincode-serializer",
] ]
[[test]]
name = "postgres"
path = "tests/postgres.rs"
required-features = ["postgres-storage-nativetls", "cbor-serializer", "bincode-serializer"]
[[example]] [[example]]
name = "admin" name = "admin"

View file

@ -99,6 +99,9 @@ pub use self::{RedisStorage, RedisStorageError};
#[cfg(any(feature = "sqlite-storage-nativetls", feature = "sqlite-storage-rustls"))] #[cfg(any(feature = "sqlite-storage-nativetls", feature = "sqlite-storage-rustls"))]
pub use self::{SqliteStorage, SqliteStorageError}; pub use self::{SqliteStorage, SqliteStorageError};
#[cfg(feature = "postgres-storage-nativetls")]
pub use self::{PostgresStorage, PostgresStorageError};
pub use get_chat_id::GetChatId; pub use get_chat_id::GetChatId;
pub use storage::*; pub use storage::*;

View file

@ -9,6 +9,9 @@ mod redis_storage;
#[cfg(any(feature = "sqlite-storage-nativetls", feature = "sqlite-storage-rustls"))] #[cfg(any(feature = "sqlite-storage-nativetls", feature = "sqlite-storage-rustls"))]
mod sqlite_storage; mod sqlite_storage;
#[cfg(feature = "postgres-storage-nativetls")]
mod postgres_storage;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use teloxide_core::types::ChatId; use teloxide_core::types::ChatId;
@ -25,6 +28,9 @@ use std::sync::Arc;
#[cfg(any(feature = "sqlite-storage-nativetls", feature = "sqlite-storage-rustls"))] #[cfg(any(feature = "sqlite-storage-nativetls", feature = "sqlite-storage-rustls"))]
pub use sqlite_storage::{SqliteStorage, SqliteStorageError}; pub use sqlite_storage::{SqliteStorage, SqliteStorageError};
#[cfg(feature = "postgres-storage-nativetls")]
pub use postgres_storage::{PostgresStorage, PostgresStorageError};
/// A storage with an erased error type. /// A storage with an erased error type.
pub type ErasedStorage<D> = pub type ErasedStorage<D> =
dyn Storage<D, Error = Box<dyn std::error::Error + Send + Sync>> + Send + Sync; dyn Storage<D, Error = Box<dyn std::error::Error + Send + Sync>> + Send + Sync;

View file

@ -0,0 +1,157 @@
use std::{
convert::Infallible,
fmt::{Debug, Display},
str,
sync::Arc,
};
use futures::future::BoxFuture;
use serde::{de::DeserializeOwned, Serialize};
use sqlx::postgres::{PgPool, PgPoolOptions};
use teloxide_core::types::ChatId;
use thiserror::Error;
use super::{serializer::Serializer, Storage};
/// An error returned from [`PostgresStorage`].
#[derive(Debug, Error)]
pub enum PostgresStorageError<SE>
where
SE: Debug + Display,
{
#[error("dialogue serialization error: {0}")]
SerdeError(SE),
#[error("postgres error: {0}")]
PostgresError(#[from] sqlx::Error),
// TODO maybe add chat_id for the sake of completeness?
#[error("row not found")]
DialogueNotFound,
}
/// A persistent dialogue storage based on [PostgreSQL](https://www.postgresql.org/)
pub struct PostgresStorage<S> {
pool: PgPool,
serializer: S,
}
impl<S> PostgresStorage<S> {
/// Opens a connection pool to the [Postgres](https://www.postgresql.org/) database and creates the table
/// for storing dialogues.
///
/// Parameters:
/// - database_url: full url to the postgres database, for example
/// `"postgres://postgres:password@localhost/test")`
/// - max_connections: number of connections in creating connection pool. Be
/// mindful of the connection limits for your database, each
/// connection established with the Postgres creates a new process on the
/// server side
/// - serializer: what [`Serializer`] will be used to encode the dialogue
/// data. Available ones are: [`Json`], [`Bincode`], [`Cbor`]
///
/// [`Json`]: crate::dispatching::dialogue::serializer::Json
/// [`Bincode`]: crate::dispatching::dialogue::serializer::Bincode
/// [`Cbor`]: crate::dispatching::dialogue::serializer::Cbor
pub async fn open(
database_url: &str,
max_connections: u32,
serializer: S,
) -> Result<Arc<Self>, PostgresStorageError<Infallible>> {
let pool =
PgPoolOptions::new().max_connections(max_connections).connect(database_url).await?;
sqlx::query(include_str!("postgres_storage/queries/create_teloxide_dialogues.sql"))
.execute(&pool)
.await?;
Ok(Arc::new(Self { pool, serializer }))
}
async fn get_dialogue(
self: Arc<Self>,
ChatId(chat_id): ChatId,
) -> Result<Option<Vec<u8>>, sqlx::Error> {
#[derive(sqlx::FromRow)]
struct DialogueDbRow {
dialogue: Vec<u8>,
}
let bytes = sqlx::query_as::<_, DialogueDbRow>(include_str!(
"postgres_storage/queries/get_dialogue.sql"
))
.bind(chat_id)
.fetch_optional(&self.pool)
.await?
.map(|r| r.dialogue);
Ok(bytes)
}
}
// FIXME: these methods' bodies are almostly the same as SqliteStorage ones
// (except actual queries) Maybe combine them somehow?
impl<S, D> Storage<D> for PostgresStorage<S>
where
S: Send + Sync + Serializer<D> + 'static,
D: Send + Serialize + DeserializeOwned + 'static,
<S as Serializer<D>>::Error: Debug + Display,
{
type Error = PostgresStorageError<<S as Serializer<D>>::Error>;
fn remove_dialogue(
self: Arc<Self>,
ChatId(chat_id): ChatId,
) -> BoxFuture<'static, Result<(), Self::Error>>
where
D: Send + 'static,
{
Box::pin(async move {
let deleted_rows_count =
sqlx::query(include_str!("postgres_storage/queries/remove_dialogue.sql"))
.bind(chat_id)
.execute(&self.pool)
.await?
.rows_affected();
if deleted_rows_count == 0 {
return Err(PostgresStorageError::DialogueNotFound);
}
Ok(())
})
}
fn update_dialogue(
self: Arc<Self>,
ChatId(chat_id): ChatId,
dialogue: D,
) -> BoxFuture<'static, Result<(), Self::Error>>
where
D: Send + 'static,
{
Box::pin(async move {
let d =
self.serializer.serialize(&dialogue).map_err(PostgresStorageError::SerdeError)?;
sqlx::query(include_str!("postgres_storage/queries/update_dialogue.sql"))
.bind(chat_id)
.bind(d)
.execute(&self.pool)
.await?;
Ok(())
})
}
fn get_dialogue(
self: Arc<Self>,
chat_id: ChatId,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
Box::pin(async move {
self.clone()
.get_dialogue(chat_id)
.await?
.map(|d| self.serializer.deserialize(&d).map_err(PostgresStorageError::SerdeError))
.transpose()
})
}
}

View file

@ -0,0 +1,4 @@
CREATE TABLE IF NOT EXISTS teloxide_dialogues (
chat_id BIGINT PRIMARY KEY,
dialogue BYTEA NOT NULL
)

View file

@ -0,0 +1 @@
SELECT dialogue FROM teloxide_dialogues WHERE chat_id = $1

View file

@ -0,0 +1 @@
DELETE FROM teloxide_dialogues WHERE chat_id = $1

View file

@ -0,0 +1,2 @@
INSERT INTO teloxide_dialogues VALUES ($1, $2)
ON CONFLICT(chat_id) DO UPDATE SET dialogue=excluded.dialogue

View file

@ -0,0 +1,93 @@
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use teloxide::{
dispatching::dialogue::{PostgresStorage, PostgresStorageError, Serializer, Storage},
types::ChatId,
};
// These examples are meant to run under the CI with the postgres service
// Were checked locally
#[tokio::test]
#[cfg_attr(not(CI_POSTGRES), ignore)]
async fn test_postgres_json() {
let storage = PostgresStorage::open(
"postgres://teloxide:rewrite_it_in_rust@localhost:5432/test_postgres_json",
1,
teloxide::dispatching::dialogue::serializer::Json,
)
.await
.unwrap();
test_postgres(storage).await;
}
#[tokio::test]
#[cfg_attr(not(CI_POSTGRES), ignore)]
async fn test_postgres_bincode() {
let storage = PostgresStorage::open(
"postgres://teloxide:rewrite_it_in_rust@localhost:5432/test_postgres_bincode",
1,
teloxide::dispatching::dialogue::serializer::Bincode,
)
.await
.unwrap();
test_postgres(storage).await;
}
#[tokio::test]
#[cfg_attr(not(CI_POSTGRES), ignore)]
async fn test_postgres_cbor() {
let storage = PostgresStorage::open(
"postgres://teloxide:rewrite_it_in_rust@localhost:5432/test_postgres_cbor",
1,
teloxide::dispatching::dialogue::serializer::Cbor,
)
.await
.unwrap();
test_postgres(storage).await;
}
type Dialogue = String;
macro_rules! test_dialogues {
($storage:expr, $_0:expr, $_1:expr, $_2:expr) => {
assert_eq!(Arc::clone(&$storage).get_dialogue(ChatId(1)).await.unwrap(), $_0);
assert_eq!(Arc::clone(&$storage).get_dialogue(ChatId(11)).await.unwrap(), $_1);
assert_eq!(Arc::clone(&$storage).get_dialogue(ChatId(256)).await.unwrap(), $_2);
};
}
async fn test_postgres<S>(storage: Arc<PostgresStorage<S>>)
where
S: Send + Sync + Serializer<Dialogue> + 'static,
<S as Serializer<Dialogue>>::Error: Debug + Display,
{
test_dialogues!(storage, None, None, None);
Arc::clone(&storage).update_dialogue(ChatId(1), "ABC".to_owned()).await.unwrap();
Arc::clone(&storage).update_dialogue(ChatId(11), "DEF".to_owned()).await.unwrap();
Arc::clone(&storage).update_dialogue(ChatId(256), "GHI".to_owned()).await.unwrap();
test_dialogues!(
storage,
Some("ABC".to_owned()),
Some("DEF".to_owned()),
Some("GHI".to_owned())
);
Arc::clone(&storage).remove_dialogue(ChatId(1)).await.unwrap();
Arc::clone(&storage).remove_dialogue(ChatId(11)).await.unwrap();
Arc::clone(&storage).remove_dialogue(ChatId(256)).await.unwrap();
test_dialogues!(storage, None, None, None);
// Check that a try to remove a non-existing dialogue results in an error.
assert!(matches!(
Arc::clone(&storage).remove_dialogue(ChatId(1)).await.unwrap_err(),
PostgresStorageError::DialogueNotFound
));
}