From b86b2f67f373b78f5d4adba3b94f0a8f7b131212 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Fri, 18 Mar 2022 02:39:56 +0400 Subject: [PATCH] Add built-in support for webhooks This commit adds built-in support for webhooks based on the `axum` framework under a new "webhooks-axum" feature. API: `update_listeners::webhooks` module with `Options` (a common struct that stores all webhook related settings) and `axum` (function that implements webhook). --- Cargo.toml | 7 + src/dispatching/update_listeners.rs | 3 + src/dispatching/update_listeners/webhooks.rs | 163 +++++++++++++++++++ 3 files changed, 173 insertions(+) create mode 100644 src/dispatching/update_listeners/webhooks.rs diff --git a/Cargo.toml b/Cargo.toml index bc4b7cd4..de64cb58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,8 @@ exclude = ["media"] # FIXME: remove "cache-me" that was added by mistake here default = ["native-tls", "ctrlc_handler", "teloxide-core/default", "auto-send", "cache-me"] +webhooks-axum = ["axum", "tower", "tower-http"] + sqlite-storage = ["sqlx"] redis-storage = ["redis"] cbor-serializer = ["serde_cbor"] @@ -37,6 +39,7 @@ erased = ["teloxide-core/erased"] nightly = ["teloxide-core/nightly"] full = [ + "webhooks-axum", "sqlite-storage", "redis-storage", "cbor-serializer", @@ -65,6 +68,7 @@ dptree = { version = "0.1.0" } tokio = { version = "1.8", features = ["fs"] } tokio-util = "0.6" +url = "2.2.2" log = "0.4" bytes = "1.0" mime = "0.3" @@ -84,6 +88,9 @@ sqlx = { version = "0.5", optional = true, default-features = false, features = redis = { version = "0.20", features = ["tokio-comp"], optional = true } serde_cbor = { version = "0.11", optional = true } bincode = { version = "1.3", optional = true } +axum = { version = "0.4.8", optional = true } +tower = { version = "0.4.12", optional = true } +tower-http = { version = "0.2.5", features = ["trace"], optional = true } [dev-dependencies] rand = "0.8.3" diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index b2afa015..4f3c01b5 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -23,6 +23,9 @@ //! [long polling]: https://en.wikipedia.org/wiki/Push_technology#Long_polling //! [webhooks]: https://en.wikipedia.org/wiki/Webhook +#[cfg(any(feature = "webhooks-axum"))] +pub mod webhooks; + use futures::Stream; use std::time::Duration; diff --git a/src/dispatching/update_listeners/webhooks.rs b/src/dispatching/update_listeners/webhooks.rs new file mode 100644 index 00000000..de8240ad --- /dev/null +++ b/src/dispatching/update_listeners/webhooks.rs @@ -0,0 +1,163 @@ +use std::{convert::Infallible, net::SocketAddr}; + +use crate::{dispatching::update_listeners::UpdateListener, requests::Requester, types::InputFile}; + +/// Options related to setting up webhooks. +pub struct Options { + /// Local address to listen to. + pub address: SocketAddr, + + /// Public url that Telegram will send updates to. + /// + /// Note: + /// - At the time of writing only ports 443, 80, 88 and 8443 [are + /// supported][set_webhook] + /// - This url must be forwarded to the [address][addr] in order for webhook + /// to work + /// - This url should be kept private, otherwise malicious actors can + /// pretend to be Telegram and send fake updates to your bot + /// + /// [set_webhook]: https://core.telegram.org/bots/api#setwebhook + /// [addr]: (self::Options.address) + pub url: url::Url, + + /// Upload your public key certificate so that the root certificate in use + /// can be checked. See Telegram's [self-signed guide] for details. + /// + /// [self-signed guide]: https://core.telegram.org/bots/self-signed + /// + /// Default - None. + pub certificate: Option, + + /// Pass `true` to drop all pending updates. + /// + /// Default - None. + pub drop_pending_updates: Option, +} + +impl Options { + /// Construct a new webhook options, see [`Options.address`] and + /// [`Options.url`] for details. + pub fn new(address: SocketAddr, url: url::Url) -> Self { + Self { address, url, certificate: None, drop_pending_updates: None } + } + + /// Upload your public key certificate so that the root certificate in use + /// can be checked. See Telegram's [self-signed guide] for details. + /// + /// [self-signed guide]: https://core.telegram.org/bots/self-signed + pub fn certificate(self, v: InputFile) -> Self { + Self { certificate: Some(v), ..self } + } + + /// Drop all pending updates before setting up webhook. + pub fn drop_pending_updates(self) -> Self { + Self { drop_pending_updates: Some(true), ..self } + } +} + +/// Webhook implementation based on the [axum] framework. +/// +/// ## Panics +/// +/// If binding to the [address] fails. +/// +/// [address]: Options.address +/// +/// ## Errors +/// +/// If `set_webhook()` fails. +#[cfg(feature = "webhooks-axum")] +pub async fn axum(bot: R, options: Options) -> Result, R::Err> +where + R: Requester + Send + 'static, + ::DeleteWebhook: Send, +{ + use crate::{ + dispatching::{stop_token::AsyncStopToken, update_listeners}, + requests::Request, + types::Update, + }; + use axum::{ + extract::Extension, http::StatusCode, response::IntoResponse, routing::post, + AddExtensionLayer, + }; + use futures::FutureExt; + use teloxide_core::requests::HasPayload; + use tokio::sync::mpsc; + use tokio_stream::wrappers::UnboundedReceiverStream; + use tower::ServiceBuilder; + use tower_http::trace::TraceLayer; + + type Sender = mpsc::UnboundedSender>; + + let Options { address, url, certificate, drop_pending_updates } = options; + + { + let mut req = bot.set_webhook(url.clone()); + req.payload_mut().certificate = certificate; + req.payload_mut().drop_pending_updates = drop_pending_updates; + + req.send().await?; + } + + let (tx, rx): (Sender, _) = mpsc::unbounded_channel(); + + async fn telegram_request(input: String, tx: Extension) -> impl IntoResponse { + match serde_json::from_str(&input) { + Ok(update) => { + tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook") + } + Err(error) => { + log::error!( + "Cannot parse an update.\nError: {:?}\nValue: {}\n\ + This is a bug in teloxide-core, please open an issue here: \ + https://github.com/teloxide/teloxide/issues.", + error, + input + ); + } + }; + + StatusCode::OK + } + + let app = axum::Router::new().route(url.path(), post(telegram_request)).layer( + ServiceBuilder::new() + .layer(TraceLayer::new_for_http()) + .layer(AddExtensionLayer::new(tx)) + .into_inner(), + ); + + let (stop_token, stop_flag) = AsyncStopToken::new_pair(); + + tokio::spawn(async move { + axum::Server::bind(&address) + .serve(app.into_make_service()) + .with_graceful_shutdown(stop_flag.then(move |()| async move { + // This assignment is needed to not require `R: Sync` since without it `&bot` + // temporary lives across `.await` points. + let req = bot.delete_webhook().send(); + let res = req.await; + if let Err(err) = res { + log::error!("Couldn't delete webhook: {}", err); + } + })) + .await + .expect("Axum server error") + }); + + let stream = UnboundedReceiverStream::new(rx); + + fn streamf(state: &mut (S, T)) -> &mut S { + &mut state.0 + } + + let listener = update_listeners::StatefulListener::new( + (stream, stop_token), + streamf, + |state: &mut (_, AsyncStopToken)| state.1.clone(), + ); + + Ok(listener) +}