From a6d3e1e9b115e7d794e056becdc136c89bb8bb63 Mon Sep 17 00:00:00 2001 From: Lev Khoroshansky Date: Mon, 28 Nov 2022 15:42:12 +0100 Subject: [PATCH 1/3] deps: Bump `axum` to `0.6.0` --- crates/teloxide/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/teloxide/Cargo.toml b/crates/teloxide/Cargo.toml index 05a6367f..7c9e6634 100644 --- a/crates/teloxide/Cargo.toml +++ b/crates/teloxide/Cargo.toml @@ -94,7 +94,7 @@ sqlx = { version = "0.6", optional = true, default-features = false, features = redis = { version = "0.21", features = ["tokio-comp"], optional = true } serde_cbor = { version = "0.11", optional = true } bincode = { version = "1.3", optional = true } -axum = { version = "0.5.13", optional = true } +axum = { version = "0.6.0", optional = true } tower = { version = "0.4.12", optional = true } tower-http = { version = "0.3.4", features = ["trace"], optional = true } rand = { version = "0.8.5", optional = true } From b6c27db50e26ded117aa69cd2f4396be9e537be4 Mon Sep 17 00:00:00 2001 From: Lev Khoroshansky Date: Mon, 28 Nov 2022 15:42:51 +0100 Subject: [PATCH 2/3] fix: Adjust to `axum@0.6.0` --- .../update_listeners/webhooks/axum.rs | 66 ++++++++++--------- 1 file changed, 36 insertions(+), 30 deletions(-) diff --git a/crates/teloxide/src/dispatching/update_listeners/webhooks/axum.rs b/crates/teloxide/src/dispatching/update_listeners/webhooks/axum.rs index 7bbe03ba..45cb5e79 100644 --- a/crates/teloxide/src/dispatching/update_listeners/webhooks/axum.rs +++ b/crates/teloxide/src/dispatching/update_listeners/webhooks/axum.rs @@ -1,14 +1,16 @@ use std::{convert::Infallible, future::Future, pin::Pin}; use axum::{ - extract::{FromRequest, RequestParts}, - http::status::StatusCode, + extract::{FromRequestParts, State}, + http::{request::Parts, status::StatusCode}, }; +use tokio::sync::mpsc; use crate::{ dispatching::update_listeners::{webhooks::Options, UpdateListener}, requests::Requester, stop::StopFlag, + types::Update, }; /// Webhook implementation based on the [mod@axum] framework. @@ -156,25 +158,17 @@ pub fn axum_no_setup( use crate::{ dispatching::update_listeners::{self, webhooks::tuple_first_mut}, stop::{mk_stop_token, StopToken}, - types::Update, }; - use axum::{extract::Extension, response::IntoResponse, routing::post}; - use tokio::sync::mpsc; + use axum::{response::IntoResponse, routing::post}; use tokio_stream::wrappers::UnboundedReceiverStream; - use tower::ServiceBuilder; use tower_http::trace::TraceLayer; - type Sender = mpsc::UnboundedSender>; - type CSender = ClosableSender>; - - let (tx, rx): (Sender, _) = mpsc::unbounded_channel(); + let (tx, rx): (UpdateSender, _) = mpsc::unbounded_channel(); async fn telegram_request( - input: String, + State(WebhookState { secret, flag, mut tx }): State, secret_header: XTelegramBotApiSecretToken, - secret: Extension>, - tx: Extension, - flag: Extension, + input: String, ) -> impl IntoResponse { // FIXME: use constant time comparison here if secret_header.0.as_deref() != secret.as_deref().map(str::as_bytes) { @@ -186,7 +180,7 @@ pub fn axum_no_setup( // Do not process updates after `.stop()` is called even if the server is still // running (useful for when you need to stop the bot but can't stop the server). _ if flag.is_stopped() => { - { tx.0 }.close(); + tx.close(); return StatusCode::SERVICE_UNAVAILABLE; } Some(tx) => tx, @@ -212,14 +206,14 @@ pub fn axum_no_setup( let (stop_token, stop_flag) = mk_stop_token(); - let app = axum::Router::new().route(options.url.path(), post(telegram_request)).layer( - ServiceBuilder::new() - .layer(TraceLayer::new_for_http()) - .layer(Extension(ClosableSender::new(tx))) - .layer(Extension(stop_flag.clone())) - .layer(Extension(options.secret_token)) - .into_inner(), - ); + let app = axum::Router::new() + .route(options.url.path(), post(telegram_request)) + .layer(TraceLayer::new_for_http()) + .with_state(WebhookState { + tx: ClosableSender::new(tx), + flag: stop_flag.clone(), + secret: options.secret_token, + }); let stream = UnboundedReceiverStream::new(rx); @@ -233,9 +227,19 @@ pub fn axum_no_setup( (listener, stop_flag, app) } +type UpdateSender = mpsc::UnboundedSender>; +type UpdateCSender = ClosableSender>; + +#[derive(Clone)] +struct WebhookState { + tx: UpdateCSender, + flag: StopFlag, + secret: Option, +} + /// A terrible workaround to drop axum extension struct ClosableSender { - origin: std::sync::Arc>>>, + origin: std::sync::Arc>>>, } impl Clone for ClosableSender { @@ -245,11 +249,11 @@ impl Clone for ClosableSender { } impl ClosableSender { - fn new(sender: tokio::sync::mpsc::UnboundedSender) -> Self { + fn new(sender: mpsc::UnboundedSender) -> Self { Self { origin: std::sync::Arc::new(std::sync::RwLock::new(Some(sender))) } } - fn get(&self) -> Option> { + fn get(&self) -> Option> { self.origin.read().unwrap().clone() } @@ -260,20 +264,22 @@ impl ClosableSender { struct XTelegramBotApiSecretToken(Option>); -impl FromRequest for XTelegramBotApiSecretToken { +impl FromRequestParts for XTelegramBotApiSecretToken { type Rejection = StatusCode; - fn from_request<'l0, 'at>( - req: &'l0 mut RequestParts, + fn from_request_parts<'l0, 'l1, 'at>( + req: &'l0 mut Parts, + _state: &'l1 S, ) -> Pin> + Send + 'at>> where 'l0: 'at, + 'l1: 'at, Self: 'at, { use crate::dispatching::update_listeners::webhooks::check_secret; let res = req - .headers_mut() + .headers .remove("x-telegram-bot-api-secret-token") .map(|header| { check_secret(header.as_bytes()) From d89dfa77c1d6125fa257792085655a29b9be8e91 Mon Sep 17 00:00:00 2001 From: Lev Khoroshansky Date: Mon, 28 Nov 2022 15:43:54 +0100 Subject: [PATCH 3/3] chore: Sync changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4fbeeff0..5348b4ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## unreleased +### Changed + +- Updated `axum` to v0.6.0. + ## Removed - `rocksdb-storage` feature and associated items (See [PR #761](https://github.com/teloxide/teloxide/pull/761) for reasoning) [**BC**]