mirror of
https://github.com/teloxide/teloxide.git
synced 2025-03-22 06:45:37 +01:00
commit
c312523375
3 changed files with 41 additions and 31 deletions
|
@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
|
|
||||||
## unreleased
|
## unreleased
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
- Updated `axum` to v0.6.0.
|
||||||
|
|
||||||
## Removed
|
## Removed
|
||||||
|
|
||||||
- `rocksdb-storage` feature and associated items (See [PR #761](https://github.com/teloxide/teloxide/pull/761) for reasoning) [**BC**]
|
- `rocksdb-storage` feature and associated items (See [PR #761](https://github.com/teloxide/teloxide/pull/761) for reasoning) [**BC**]
|
||||||
|
|
|
@ -94,7 +94,7 @@ sqlx = { version = "0.6", optional = true, default-features = false, features =
|
||||||
redis = { version = "0.21", features = ["tokio-comp"], optional = true }
|
redis = { version = "0.21", features = ["tokio-comp"], optional = true }
|
||||||
serde_cbor = { version = "0.11", optional = true }
|
serde_cbor = { version = "0.11", optional = true }
|
||||||
bincode = { version = "1.3", optional = true }
|
bincode = { version = "1.3", optional = true }
|
||||||
axum = { version = "0.5.13", optional = true }
|
axum = { version = "0.6.0", optional = true }
|
||||||
tower = { version = "0.4.12", optional = true }
|
tower = { version = "0.4.12", optional = true }
|
||||||
tower-http = { version = "0.3.4", features = ["trace"], optional = true }
|
tower-http = { version = "0.3.4", features = ["trace"], optional = true }
|
||||||
rand = { version = "0.8.5", optional = true }
|
rand = { version = "0.8.5", optional = true }
|
||||||
|
|
|
@ -1,14 +1,16 @@
|
||||||
use std::{convert::Infallible, future::Future, pin::Pin};
|
use std::{convert::Infallible, future::Future, pin::Pin};
|
||||||
|
|
||||||
use axum::{
|
use axum::{
|
||||||
extract::{FromRequest, RequestParts},
|
extract::{FromRequestParts, State},
|
||||||
http::status::StatusCode,
|
http::{request::Parts, status::StatusCode},
|
||||||
};
|
};
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
dispatching::update_listeners::{webhooks::Options, UpdateListener},
|
dispatching::update_listeners::{webhooks::Options, UpdateListener},
|
||||||
requests::Requester,
|
requests::Requester,
|
||||||
stop::StopFlag,
|
stop::StopFlag,
|
||||||
|
types::Update,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Webhook implementation based on the [mod@axum] framework.
|
/// Webhook implementation based on the [mod@axum] framework.
|
||||||
|
@ -156,25 +158,17 @@ pub fn axum_no_setup(
|
||||||
use crate::{
|
use crate::{
|
||||||
dispatching::update_listeners::{self, webhooks::tuple_first_mut},
|
dispatching::update_listeners::{self, webhooks::tuple_first_mut},
|
||||||
stop::{mk_stop_token, StopToken},
|
stop::{mk_stop_token, StopToken},
|
||||||
types::Update,
|
|
||||||
};
|
};
|
||||||
use axum::{extract::Extension, response::IntoResponse, routing::post};
|
use axum::{response::IntoResponse, routing::post};
|
||||||
use tokio::sync::mpsc;
|
|
||||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||||
use tower::ServiceBuilder;
|
|
||||||
use tower_http::trace::TraceLayer;
|
use tower_http::trace::TraceLayer;
|
||||||
|
|
||||||
type Sender = mpsc::UnboundedSender<Result<Update, std::convert::Infallible>>;
|
let (tx, rx): (UpdateSender, _) = mpsc::unbounded_channel();
|
||||||
type CSender = ClosableSender<Result<Update, std::convert::Infallible>>;
|
|
||||||
|
|
||||||
let (tx, rx): (Sender, _) = mpsc::unbounded_channel();
|
|
||||||
|
|
||||||
async fn telegram_request(
|
async fn telegram_request(
|
||||||
input: String,
|
State(WebhookState { secret, flag, mut tx }): State<WebhookState>,
|
||||||
secret_header: XTelegramBotApiSecretToken,
|
secret_header: XTelegramBotApiSecretToken,
|
||||||
secret: Extension<Option<String>>,
|
input: String,
|
||||||
tx: Extension<CSender>,
|
|
||||||
flag: Extension<StopFlag>,
|
|
||||||
) -> impl IntoResponse {
|
) -> impl IntoResponse {
|
||||||
// FIXME: use constant time comparison here
|
// FIXME: use constant time comparison here
|
||||||
if secret_header.0.as_deref() != secret.as_deref().map(str::as_bytes) {
|
if secret_header.0.as_deref() != secret.as_deref().map(str::as_bytes) {
|
||||||
|
@ -186,7 +180,7 @@ pub fn axum_no_setup(
|
||||||
// Do not process updates after `.stop()` is called even if the server is still
|
// Do not process updates after `.stop()` is called even if the server is still
|
||||||
// running (useful for when you need to stop the bot but can't stop the server).
|
// running (useful for when you need to stop the bot but can't stop the server).
|
||||||
_ if flag.is_stopped() => {
|
_ if flag.is_stopped() => {
|
||||||
{ tx.0 }.close();
|
tx.close();
|
||||||
return StatusCode::SERVICE_UNAVAILABLE;
|
return StatusCode::SERVICE_UNAVAILABLE;
|
||||||
}
|
}
|
||||||
Some(tx) => tx,
|
Some(tx) => tx,
|
||||||
|
@ -212,14 +206,14 @@ pub fn axum_no_setup(
|
||||||
|
|
||||||
let (stop_token, stop_flag) = mk_stop_token();
|
let (stop_token, stop_flag) = mk_stop_token();
|
||||||
|
|
||||||
let app = axum::Router::new().route(options.url.path(), post(telegram_request)).layer(
|
let app = axum::Router::new()
|
||||||
ServiceBuilder::new()
|
.route(options.url.path(), post(telegram_request))
|
||||||
.layer(TraceLayer::new_for_http())
|
.layer(TraceLayer::new_for_http())
|
||||||
.layer(Extension(ClosableSender::new(tx)))
|
.with_state(WebhookState {
|
||||||
.layer(Extension(stop_flag.clone()))
|
tx: ClosableSender::new(tx),
|
||||||
.layer(Extension(options.secret_token))
|
flag: stop_flag.clone(),
|
||||||
.into_inner(),
|
secret: options.secret_token,
|
||||||
);
|
});
|
||||||
|
|
||||||
let stream = UnboundedReceiverStream::new(rx);
|
let stream = UnboundedReceiverStream::new(rx);
|
||||||
|
|
||||||
|
@ -233,9 +227,19 @@ pub fn axum_no_setup(
|
||||||
(listener, stop_flag, app)
|
(listener, stop_flag, app)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type UpdateSender = mpsc::UnboundedSender<Result<Update, std::convert::Infallible>>;
|
||||||
|
type UpdateCSender = ClosableSender<Result<Update, std::convert::Infallible>>;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct WebhookState {
|
||||||
|
tx: UpdateCSender,
|
||||||
|
flag: StopFlag,
|
||||||
|
secret: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
/// A terrible workaround to drop axum extension
|
/// A terrible workaround to drop axum extension
|
||||||
struct ClosableSender<T> {
|
struct ClosableSender<T> {
|
||||||
origin: std::sync::Arc<std::sync::RwLock<Option<tokio::sync::mpsc::UnboundedSender<T>>>>,
|
origin: std::sync::Arc<std::sync::RwLock<Option<mpsc::UnboundedSender<T>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Clone for ClosableSender<T> {
|
impl<T> Clone for ClosableSender<T> {
|
||||||
|
@ -245,11 +249,11 @@ impl<T> Clone for ClosableSender<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> ClosableSender<T> {
|
impl<T> ClosableSender<T> {
|
||||||
fn new(sender: tokio::sync::mpsc::UnboundedSender<T>) -> Self {
|
fn new(sender: mpsc::UnboundedSender<T>) -> Self {
|
||||||
Self { origin: std::sync::Arc::new(std::sync::RwLock::new(Some(sender))) }
|
Self { origin: std::sync::Arc::new(std::sync::RwLock::new(Some(sender))) }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get(&self) -> Option<tokio::sync::mpsc::UnboundedSender<T>> {
|
fn get(&self) -> Option<mpsc::UnboundedSender<T>> {
|
||||||
self.origin.read().unwrap().clone()
|
self.origin.read().unwrap().clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -260,20 +264,22 @@ impl<T> ClosableSender<T> {
|
||||||
|
|
||||||
struct XTelegramBotApiSecretToken(Option<Vec<u8>>);
|
struct XTelegramBotApiSecretToken(Option<Vec<u8>>);
|
||||||
|
|
||||||
impl<B> FromRequest<B> for XTelegramBotApiSecretToken {
|
impl<S> FromRequestParts<S> for XTelegramBotApiSecretToken {
|
||||||
type Rejection = StatusCode;
|
type Rejection = StatusCode;
|
||||||
|
|
||||||
fn from_request<'l0, 'at>(
|
fn from_request_parts<'l0, 'l1, 'at>(
|
||||||
req: &'l0 mut RequestParts<B>,
|
req: &'l0 mut Parts,
|
||||||
|
_state: &'l1 S,
|
||||||
) -> Pin<Box<dyn Future<Output = Result<Self, Self::Rejection>> + Send + 'at>>
|
) -> Pin<Box<dyn Future<Output = Result<Self, Self::Rejection>> + Send + 'at>>
|
||||||
where
|
where
|
||||||
'l0: 'at,
|
'l0: 'at,
|
||||||
|
'l1: 'at,
|
||||||
Self: 'at,
|
Self: 'at,
|
||||||
{
|
{
|
||||||
use crate::dispatching::update_listeners::webhooks::check_secret;
|
use crate::dispatching::update_listeners::webhooks::check_secret;
|
||||||
|
|
||||||
let res = req
|
let res = req
|
||||||
.headers_mut()
|
.headers
|
||||||
.remove("x-telegram-bot-api-secret-token")
|
.remove("x-telegram-bot-api-secret-token")
|
||||||
.map(|header| {
|
.map(|header| {
|
||||||
check_secret(header.as_bytes())
|
check_secret(header.as_bytes())
|
||||||
|
|
Loading…
Add table
Reference in a new issue