Fix webhook examples (support graceful shutdown)

This commit is contained in:
Waffle 2021-05-22 01:15:32 +03:00
parent 8785b8263c
commit 76dee997e0
3 changed files with 33 additions and 15 deletions

View file

@ -1,7 +1,7 @@
// The version of Heroku ping-pong-bot, which uses a webhook to receive updates // The version of Heroku ping-pong-bot, which uses a webhook to receive updates
// from Telegram, instead of long polling. // from Telegram, instead of long polling.
use teloxide::{dispatching::update_listeners, prelude::*, types::Update}; use teloxide::{dispatching::{update_listeners::{self, StatefulListener}, stop_token::AsyncStopToken}, prelude::*, types::Update};
use std::{convert::Infallible, env, net::SocketAddr}; use std::{convert::Infallible, env, net::SocketAddr};
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -20,8 +20,8 @@ async fn handle_rejection(error: warp::Rejection) -> Result<impl warp::Reply, In
Ok(StatusCode::INTERNAL_SERVER_ERROR) Ok(StatusCode::INTERNAL_SERVER_ERROR)
} }
pub async fn webhook<'a>(bot: AutoSend<Bot>) -> impl update_listeners::UpdateListener<Infallible> { pub async fn webhook(bot: AutoSend<Bot>) -> impl update_listeners::UpdateListener<Infallible> {
// Heroku defines auto defines a port value // Heroku auto defines a port value
let teloxide_token = env::var("TELOXIDE_TOKEN").expect("TELOXIDE_TOKEN env variable missing"); let teloxide_token = env::var("TELOXIDE_TOKEN").expect("TELOXIDE_TOKEN env variable missing");
let port: u16 = env::var("PORT") let port: u16 = env::var("PORT")
.expect("PORT env variable missing") .expect("PORT env variable missing")
@ -48,11 +48,21 @@ pub async fn webhook<'a>(bot: AutoSend<Bot>) -> impl update_listeners::UpdateLis
}) })
.recover(handle_rejection); .recover(handle_rejection);
let serve = warp::serve(server); let (stop_token, stop_flag) = AsyncStopToken::new_pair();
let address = format!("0.0.0.0:{}", port); let addr = format!("0.0.0.0:{}", port).parse::<SocketAddr>().unwrap();
tokio::spawn(serve.run(address.parse::<SocketAddr>().unwrap())); let server = warp::serve(server);
UnboundedReceiverStream::new(rx) let (_addr, fut) = server.bind_with_graceful_shutdown(addr, stop_flag);
// You might want to use serve.key_path/serve.cert_path methods here to
// setup a self-signed TLS certificate.
tokio::spawn(fut);
let stream = UnboundedReceiverStream::new(rx);
fn streamf<S, T>(state: &mut (S, T)) -> &mut S { &mut state.0 }
StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| state.1.clone(), None::<for<'a> fn(&'a _) -> _>)
} }
async fn run() { async fn run() {

View file

@ -1,7 +1,7 @@
// The version of ngrok ping-pong-bot, which uses a webhook to receive updates // The version of ngrok ping-pong-bot, which uses a webhook to receive updates
// from Telegram, instead of long polling. // from Telegram, instead of long polling.
use teloxide::{dispatching::update_listeners, prelude::*, types::Update}; use teloxide::{dispatching::{update_listeners::{self, StatefulListener}, stop_token::AsyncStopToken}, prelude::*, types::Update};
use std::{convert::Infallible, net::SocketAddr}; use std::{convert::Infallible, net::SocketAddr};
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -20,10 +20,10 @@ async fn handle_rejection(error: warp::Rejection) -> Result<impl warp::Reply, In
Ok(StatusCode::INTERNAL_SERVER_ERROR) Ok(StatusCode::INTERNAL_SERVER_ERROR)
} }
pub async fn webhook<'a>(bot: AutoSend<Bot>) -> impl update_listeners::UpdateListener<Infallible> { pub async fn webhook(bot: AutoSend<Bot>) -> impl update_listeners::UpdateListener<Infallible> {
// You might want to specify a self-signed certificate via .certificate // You might want to specify a self-signed certificate via .certificate
// method on SetWebhook. // method on SetWebhook.
bot.set_webhook("Your HTTPS ngrok URL here. Get it by 'ngrok http 80'") bot.set_webhook("Your HTTPS ngrok URL here. Get it by `ngrok http 80`")
.await .await
.expect("Cannot setup a webhook"); .expect("Cannot setup a webhook");
@ -40,13 +40,21 @@ pub async fn webhook<'a>(bot: AutoSend<Bot>) -> impl update_listeners::UpdateLis
}) })
.recover(handle_rejection); .recover(handle_rejection);
let serve = warp::serve(server); let (stop_token, stop_flag) = AsyncStopToken::new_pair();
let addr = "127.0.0.1:80".parse::<SocketAddr>().unwrap();
let server = warp::serve(server);
let (_addr, fut) = server.bind_with_graceful_shutdown(addr, stop_flag);
// You might want to use serve.key_path/serve.cert_path methods here to // You might want to use serve.key_path/serve.cert_path methods here to
// setup a self-signed TLS certificate. // setup a self-signed TLS certificate.
tokio::spawn(serve.run("127.0.0.1:80".parse::<SocketAddr>().unwrap())); tokio::spawn(fut);
UnboundedReceiverStream::new(rx) let stream = UnboundedReceiverStream::new(rx);
fn streamf<S, T>(state: &mut (S, T)) -> &mut S { &mut state.0 }
StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| state.1.clone(), None::<for<'a> fn(&'a _) -> _>)
} }
async fn run() { async fn run() {

View file

@ -144,11 +144,11 @@ where
token, token,
}; };
let stop = |st: &mut State<_>| st.token.clone(); let stop_token = |st: &mut State<_>| st.token.clone();
let timeout_hint = Some(move |_: &State<_>| timeout); let timeout_hint = Some(move |_: &State<_>| timeout);
StatefulListener { state, stream, stop_token: stop, timeout_hint } StatefulListener { state, stream, stop_token, timeout_hint }
} }
async fn delete_webhook_if_setup<R>(requester: &R) async fn delete_webhook_if_setup<R>(requester: &R)