From 2bdfcc9ca60ce5e4f7906442954a955a37cc225b Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Fri, 25 Mar 2022 17:15:11 +0400 Subject: [PATCH] Move axum webhook impl into a sub-mod --- src/dispatching/update_listeners/webhooks.rs | 146 +---------------- .../update_listeners/webhooks/axum.rs | 149 ++++++++++++++++++ 2 files changed, 153 insertions(+), 142 deletions(-) create mode 100644 src/dispatching/update_listeners/webhooks/axum.rs diff --git a/src/dispatching/update_listeners/webhooks.rs b/src/dispatching/update_listeners/webhooks.rs index 6f43ef12..41df9085 100644 --- a/src/dispatching/update_listeners/webhooks.rs +++ b/src/dispatching/update_listeners/webhooks.rs @@ -1,10 +1,6 @@ -use std::{convert::Infallible, net::SocketAddr}; +use std::net::SocketAddr; -use crate::{ - dispatching::{stop_token::StopToken, update_listeners::UpdateListener}, - requests::Requester, - types::InputFile, -}; +use crate::{requests::Requester, types::InputFile}; /// Options related to setting up webhooks. pub struct Options { @@ -60,145 +56,11 @@ impl Options { } } -/// Webhook implementation based on the [axum] framework. -/// -/// ## Panics -/// -/// If binding to the [address] fails. -/// -/// [address]: Options.address -/// -/// ## Errors -/// -/// If `set_webhook()` fails. #[cfg(feature = "webhooks-axum")] -pub async fn axum( - bot: R, - options: Options, -) -> Result, R::Err> -where - R: Requester + Send + 'static, - ::DeleteWebhook: Send, -{ - let Options { address, .. } = options; - - let (mut update_listener, stop_flag, app) = axum_to_router(bot, options).await?; - let stop_token = update_listener.stop_token(); - - tokio::spawn(async move { - axum::Server::bind(&address) - .serve(app.into_make_service()) - .with_graceful_shutdown(stop_flag) - .await - .map_err(|err| { - stop_token.stop(); - err - }) - .expect("Axum server error"); - }); - - Ok(update_listener) -} +pub use self::axum::{axum, axum_no_setup, axum_to_router}; #[cfg(feature = "webhooks-axum")] -pub async fn axum_to_router( - bot: R, - mut options: Options, -) -> Result< - ( - impl UpdateListener, - impl std::future::Future + Send, - axum::Router, - ), - R::Err, -> -where - R: Requester + Send, - ::DeleteWebhook: Send, -{ - use crate::requests::Request; - use futures::FutureExt; - - setup_webhook(&bot, &mut options).await?; - - let (listener, stop_flag, router) = axum_no_setup(options); - - let stop_flag = stop_flag.then(move |()| async move { - // This assignment is needed to not require `R: Sync` since without it `&bot` - // temporary lives across `.await` points. - let req = bot.delete_webhook().send(); - let res = req.await; - if let Err(err) = res { - log::error!("Couldn't delete webhook: {}", err); - } - }); - - Ok((listener, stop_flag, router)) -} - -#[cfg(feature = "webhooks-axum")] -pub fn axum_no_setup( - options: Options, -) -> ( - impl UpdateListener, - impl std::future::Future, - axum::Router, -) { - use crate::{ - dispatching::{stop_token::AsyncStopToken, update_listeners}, - types::Update, - }; - use axum::{ - extract::Extension, http::StatusCode, response::IntoResponse, routing::post, - AddExtensionLayer, - }; - use tokio::sync::mpsc; - use tokio_stream::wrappers::UnboundedReceiverStream; - use tower::ServiceBuilder; - use tower_http::trace::TraceLayer; - - type Sender = mpsc::UnboundedSender>; - - let (tx, rx): (Sender, _) = mpsc::unbounded_channel(); - - async fn telegram_request(input: String, tx: Extension) -> impl IntoResponse { - match serde_json::from_str(&input) { - Ok(update) => { - tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook") - } - Err(error) => { - log::error!( - "Cannot parse an update.\nError: {:?}\nValue: {}\n\ - This is a bug in teloxide-core, please open an issue here: \ - https://github.com/teloxide/teloxide/issues.", - error, - input - ); - } - }; - - StatusCode::OK - } - - let app = axum::Router::new().route(options.url.path(), post(telegram_request)).layer( - ServiceBuilder::new() - .layer(TraceLayer::new_for_http()) - .layer(AddExtensionLayer::new(tx)) - .into_inner(), - ); - - let (stop_token, stop_flag) = AsyncStopToken::new_pair(); - - let stream = UnboundedReceiverStream::new(rx); - - let listener = update_listeners::StatefulListener::new( - (stream, stop_token), - tuple_first_mut, - |state: &mut (_, AsyncStopToken)| state.1.clone(), - ); - - (listener, stop_flag, app) -} +mod axum; async fn setup_webhook(bot: R, options: &mut Options) -> Result<(), R::Err> where diff --git a/src/dispatching/update_listeners/webhooks/axum.rs b/src/dispatching/update_listeners/webhooks/axum.rs new file mode 100644 index 00000000..24ef5f11 --- /dev/null +++ b/src/dispatching/update_listeners/webhooks/axum.rs @@ -0,0 +1,149 @@ +use std::convert::Infallible; + +use crate::{ + dispatching::{ + stop_token::StopToken, + update_listeners::{ + webhooks::{setup_webhook, tuple_first_mut, Options}, + UpdateListener, + }, + }, + requests::Requester, +}; + +/// Webhook implementation based on the [axum] framework. +/// +/// ## Panics +/// +/// If binding to the [address] fails. +/// +/// [address]: Options.address +/// +/// ## Errors +/// +/// If `set_webhook()` fails. +pub async fn axum( + bot: R, + options: Options, +) -> Result, R::Err> +where + R: Requester + Send + 'static, + ::DeleteWebhook: Send, +{ + let Options { address, .. } = options; + + let (mut update_listener, stop_flag, app) = axum_to_router(bot, options).await?; + let stop_token = update_listener.stop_token(); + + tokio::spawn(async move { + axum::Server::bind(&address) + .serve(app.into_make_service()) + .with_graceful_shutdown(stop_flag) + .await + .map_err(|err| { + stop_token.stop(); + err + }) + .expect("Axum server error"); + }); + + Ok(update_listener) +} + +pub async fn axum_to_router( + bot: R, + mut options: Options, +) -> Result< + ( + impl UpdateListener, + impl std::future::Future + Send, + axum::Router, + ), + R::Err, +> +where + R: Requester + Send, + ::DeleteWebhook: Send, +{ + use crate::requests::Request; + use futures::FutureExt; + + setup_webhook(&bot, &mut options).await?; + + let (listener, stop_flag, router) = axum_no_setup(options); + + let stop_flag = stop_flag.then(move |()| async move { + // This assignment is needed to not require `R: Sync` since without it `&bot` + // temporary lives across `.await` points. + let req = bot.delete_webhook().send(); + let res = req.await; + if let Err(err) = res { + log::error!("Couldn't delete webhook: {}", err); + } + }); + + Ok((listener, stop_flag, router)) +} + +pub fn axum_no_setup( + options: Options, +) -> ( + impl UpdateListener, + impl std::future::Future, + axum::Router, +) { + use crate::{ + dispatching::{stop_token::AsyncStopToken, update_listeners}, + types::Update, + }; + use axum::{ + extract::Extension, http::StatusCode, response::IntoResponse, routing::post, + AddExtensionLayer, + }; + use tokio::sync::mpsc; + use tokio_stream::wrappers::UnboundedReceiverStream; + use tower::ServiceBuilder; + use tower_http::trace::TraceLayer; + + type Sender = mpsc::UnboundedSender>; + + let (tx, rx): (Sender, _) = mpsc::unbounded_channel(); + + async fn telegram_request(input: String, tx: Extension) -> impl IntoResponse { + match serde_json::from_str(&input) { + Ok(update) => { + tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook") + } + Err(error) => { + log::error!( + "Cannot parse an update.\nError: {:?}\nValue: {}\n\ + This is a bug in teloxide-core, please open an issue here: \ + https://github.com/teloxide/teloxide/issues.", + error, + input + ); + } + }; + + StatusCode::OK + } + + let app = axum::Router::new().route(options.url.path(), post(telegram_request)).layer( + ServiceBuilder::new() + .layer(TraceLayer::new_for_http()) + .layer(AddExtensionLayer::new(tx)) + .into_inner(), + ); + + let (stop_token, stop_flag) = AsyncStopToken::new_pair(); + + let stream = UnboundedReceiverStream::new(rx); + + let listener = update_listeners::StatefulListener::new( + (stream, stop_token), + tuple_first_mut, + |state: &mut (_, AsyncStopToken)| state.1.clone(), + ); + + (listener, stop_flag, app) +}