fix: Adjust to axum@0.6.0

This commit is contained in:
Lev Khoroshansky 2022-11-28 15:42:51 +01:00
parent a6d3e1e9b1
commit b6c27db50e

View file

@ -1,14 +1,16 @@
use std::{convert::Infallible, future::Future, pin::Pin}; use std::{convert::Infallible, future::Future, pin::Pin};
use axum::{ use axum::{
extract::{FromRequest, RequestParts}, extract::{FromRequestParts, State},
http::status::StatusCode, http::{request::Parts, status::StatusCode},
}; };
use tokio::sync::mpsc;
use crate::{ use crate::{
dispatching::update_listeners::{webhooks::Options, UpdateListener}, dispatching::update_listeners::{webhooks::Options, UpdateListener},
requests::Requester, requests::Requester,
stop::StopFlag, stop::StopFlag,
types::Update,
}; };
/// Webhook implementation based on the [mod@axum] framework. /// Webhook implementation based on the [mod@axum] framework.
@ -156,25 +158,17 @@ pub fn axum_no_setup(
use crate::{ use crate::{
dispatching::update_listeners::{self, webhooks::tuple_first_mut}, dispatching::update_listeners::{self, webhooks::tuple_first_mut},
stop::{mk_stop_token, StopToken}, stop::{mk_stop_token, StopToken},
types::Update,
}; };
use axum::{extract::Extension, response::IntoResponse, routing::post}; use axum::{response::IntoResponse, routing::post};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use tower::ServiceBuilder;
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
type Sender = mpsc::UnboundedSender<Result<Update, std::convert::Infallible>>; let (tx, rx): (UpdateSender, _) = mpsc::unbounded_channel();
type CSender = ClosableSender<Result<Update, std::convert::Infallible>>;
let (tx, rx): (Sender, _) = mpsc::unbounded_channel();
async fn telegram_request( async fn telegram_request(
input: String, State(WebhookState { secret, flag, mut tx }): State<WebhookState>,
secret_header: XTelegramBotApiSecretToken, secret_header: XTelegramBotApiSecretToken,
secret: Extension<Option<String>>, input: String,
tx: Extension<CSender>,
flag: Extension<StopFlag>,
) -> impl IntoResponse { ) -> impl IntoResponse {
// FIXME: use constant time comparison here // FIXME: use constant time comparison here
if secret_header.0.as_deref() != secret.as_deref().map(str::as_bytes) { if secret_header.0.as_deref() != secret.as_deref().map(str::as_bytes) {
@ -186,7 +180,7 @@ pub fn axum_no_setup(
// Do not process updates after `.stop()` is called even if the server is still // Do not process updates after `.stop()` is called even if the server is still
// running (useful for when you need to stop the bot but can't stop the server). // running (useful for when you need to stop the bot but can't stop the server).
_ if flag.is_stopped() => { _ if flag.is_stopped() => {
{ tx.0 }.close(); tx.close();
return StatusCode::SERVICE_UNAVAILABLE; return StatusCode::SERVICE_UNAVAILABLE;
} }
Some(tx) => tx, Some(tx) => tx,
@ -212,14 +206,14 @@ pub fn axum_no_setup(
let (stop_token, stop_flag) = mk_stop_token(); let (stop_token, stop_flag) = mk_stop_token();
let app = axum::Router::new().route(options.url.path(), post(telegram_request)).layer( let app = axum::Router::new()
ServiceBuilder::new() .route(options.url.path(), post(telegram_request))
.layer(TraceLayer::new_for_http()) .layer(TraceLayer::new_for_http())
.layer(Extension(ClosableSender::new(tx))) .with_state(WebhookState {
.layer(Extension(stop_flag.clone())) tx: ClosableSender::new(tx),
.layer(Extension(options.secret_token)) flag: stop_flag.clone(),
.into_inner(), secret: options.secret_token,
); });
let stream = UnboundedReceiverStream::new(rx); let stream = UnboundedReceiverStream::new(rx);
@ -233,9 +227,19 @@ pub fn axum_no_setup(
(listener, stop_flag, app) (listener, stop_flag, app)
} }
type UpdateSender = mpsc::UnboundedSender<Result<Update, std::convert::Infallible>>;
type UpdateCSender = ClosableSender<Result<Update, std::convert::Infallible>>;
#[derive(Clone)]
struct WebhookState {
tx: UpdateCSender,
flag: StopFlag,
secret: Option<String>,
}
/// A terrible workaround to drop axum extension /// A terrible workaround to drop axum extension
struct ClosableSender<T> { struct ClosableSender<T> {
origin: std::sync::Arc<std::sync::RwLock<Option<tokio::sync::mpsc::UnboundedSender<T>>>>, origin: std::sync::Arc<std::sync::RwLock<Option<mpsc::UnboundedSender<T>>>>,
} }
impl<T> Clone for ClosableSender<T> { impl<T> Clone for ClosableSender<T> {
@ -245,11 +249,11 @@ impl<T> Clone for ClosableSender<T> {
} }
impl<T> ClosableSender<T> { impl<T> ClosableSender<T> {
fn new(sender: tokio::sync::mpsc::UnboundedSender<T>) -> Self { fn new(sender: mpsc::UnboundedSender<T>) -> Self {
Self { origin: std::sync::Arc::new(std::sync::RwLock::new(Some(sender))) } Self { origin: std::sync::Arc::new(std::sync::RwLock::new(Some(sender))) }
} }
fn get(&self) -> Option<tokio::sync::mpsc::UnboundedSender<T>> { fn get(&self) -> Option<mpsc::UnboundedSender<T>> {
self.origin.read().unwrap().clone() self.origin.read().unwrap().clone()
} }
@ -260,20 +264,22 @@ impl<T> ClosableSender<T> {
struct XTelegramBotApiSecretToken(Option<Vec<u8>>); struct XTelegramBotApiSecretToken(Option<Vec<u8>>);
impl<B> FromRequest<B> for XTelegramBotApiSecretToken { impl<S> FromRequestParts<S> for XTelegramBotApiSecretToken {
type Rejection = StatusCode; type Rejection = StatusCode;
fn from_request<'l0, 'at>( fn from_request_parts<'l0, 'l1, 'at>(
req: &'l0 mut RequestParts<B>, req: &'l0 mut Parts,
_state: &'l1 S,
) -> Pin<Box<dyn Future<Output = Result<Self, Self::Rejection>> + Send + 'at>> ) -> Pin<Box<dyn Future<Output = Result<Self, Self::Rejection>> + Send + 'at>>
where where
'l0: 'at, 'l0: 'at,
'l1: 'at,
Self: 'at, Self: 'at,
{ {
use crate::dispatching::update_listeners::webhooks::check_secret; use crate::dispatching::update_listeners::webhooks::check_secret;
let res = req let res = req
.headers_mut() .headers
.remove("x-telegram-bot-api-secret-token") .remove("x-telegram-bot-api-secret-token")
.map(|header| { .map(|header| {
check_secret(header.as_bytes()) check_secret(header.as_bytes())