From da82e887cda98677e14e1d810b29d2d48ee1182f Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Sun, 26 Nov 2023 11:34:41 +0100 Subject: [PATCH] Update graceful shutdown example (#2355) Co-authored-by: Jonas Platte --- examples/graceful-shutdown/Cargo.toml | 7 +- examples/graceful-shutdown/src/main.rs | 205 +++++++++++++++++++------ 2 files changed, 163 insertions(+), 49 deletions(-) diff --git a/examples/graceful-shutdown/Cargo.toml b/examples/graceful-shutdown/Cargo.toml index 0644ae2c..fd9c09e3 100644 --- a/examples/graceful-shutdown/Cargo.toml +++ b/examples/graceful-shutdown/Cargo.toml @@ -6,5 +6,10 @@ publish = false [dependencies] axum = { path = "../../axum" } -hyper = { version = "1.0.0", features = ["full"] } +hyper = { version = "1.0", features = [] } +hyper-util = { version = "0.1", features = ["tokio", "server-auto", "http1"] } tokio = { version = "1.0", features = ["full"] } +tower = { version = "0.4", features = ["util"] } +tower-http = { version = "0.5", features = ["timeout", "trace"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/graceful-shutdown/src/main.rs b/examples/graceful-shutdown/src/main.rs index a0e45b7c..8932b54b 100644 --- a/examples/graceful-shutdown/src/main.rs +++ b/examples/graceful-shutdown/src/main.rs @@ -4,57 +4,166 @@ //! cargo run -p example-graceful-shutdown //! kill or ctrl-c //! ``` +//! +//! Supporting graceful shutdown requires a bit of boilerplate. In the future hyper-util will +//! provide convenience helpers but for now we have to use hyper directly. -// TODO -fn main() { - eprint!("this example has not yet been updated to hyper 1.0"); +use std::time::Duration; + +use axum::{extract::Request, routing::get, Router}; +use hyper::body::Incoming; +use hyper_util::rt::TokioIo; +use tokio::net::TcpListener; +use tokio::signal; +use tokio::sync::watch; +use tokio::time::sleep; +use tower::Service; +use tower_http::timeout::TimeoutLayer; +use tower_http::trace::TraceLayer; +use tracing::debug; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +#[tokio::main] +async fn main() { + // Enable tracing. + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "example_graceful_shutdown=debug,tower_http=debug".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + + // Create a regular axum app. + let app = Router::new() + .route("/slow", get(|| sleep(Duration::from_secs(5)))) + .route("/forever", get(std::future::pending::<()>)) + .layer(( + TraceLayer::new_for_http(), + // Graceful shutdown will wait for outstanding requests to complete. Add a timeout so + // requests don't hang forever. + TimeoutLayer::new(Duration::from_secs(10)), + )); + + // Create a `TcpListener` using tokio. + let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap(); + + // Create a watch channel to track tasks that are handling connections and wait for them to + // complete. + let (close_tx, close_rx) = watch::channel(()); + + // Continuously accept new connections. + loop { + let (socket, remote_addr) = tokio::select! { + // Either accept a new connection... + result = listener.accept() => { + result.unwrap() + } + // ...or wait to receive a shutdown signal and stop the accept loop. + _ = shutdown_signal() => { + debug!("signal received, not accepting new connections"); + break; + } + }; + + debug!("connection {remote_addr} accepted"); + + // We don't need to call `poll_ready` because `Router` is always ready. + let tower_service = app.clone(); + + // Clone the watch receiver and move it into the task. + let close_rx = close_rx.clone(); + + // Spawn a task to handle the connection. That way we can serve multiple connections + // concurrently. + tokio::spawn(async move { + // Hyper has its own `AsyncRead` and `AsyncWrite` traits and doesn't use tokio. + // `TokioIo` converts between them. + let socket = TokioIo::new(socket); + + // Hyper also has its own `Service` trait and doesn't use tower. We can use + // `hyper::service::service_fn` to create a hyper `Service` that calls our app through + // `tower::Service::call`. + let hyper_service = hyper::service::service_fn(move |request: Request| { + // We have to clone `tower_service` because hyper's `Service` uses `&self` whereas + // tower's `Service` requires `&mut self`. + // + // We don't need to call `poll_ready` since `Router` is always ready. + tower_service.clone().call(request) + }); + + // `hyper_util::server::conn::auto::Builder` supports both http1 and http2 but doesn't + // support graceful so we have to use hyper directly and unfortunately pick between + // http1 and http2. + let conn = hyper::server::conn::http1::Builder::new() + .serve_connection(socket, hyper_service) + // `with_upgrades` is required for websockets. + .with_upgrades(); + + // `graceful_shutdown` requires a pinned connection. + let mut conn = std::pin::pin!(conn); + + loop { + tokio::select! { + // Poll the connection. This completes when the client has closed the + // connection, graceful shutdown has completed, or we encounter a TCP error. + result = conn.as_mut() => { + if let Err(err) = result { + debug!("failed to serve connection: {err:#}"); + } + break; + } + // Start graceful shutdown when we receive a shutdown signal. + // + // We use a loop to continue polling the connection to allow requests to finish + // after starting graceful shutdown. Our `Router` has `TimeoutLayer` so + // requests will finish after at most 10 seconds. + _ = shutdown_signal() => { + debug!("signal received, starting graceful shutdown"); + conn.as_mut().graceful_shutdown(); + } + } + } + + debug!("connection {remote_addr} closed"); + + // Drop the watch receiver to signal to `main` that this task is done. + drop(close_rx); + }); + } + + // We only care about the watch receivers that were moved into the tasks so close the residual + // receiver. + drop(close_rx); + + // Close the listener to stop accepting new connections. + drop(listener); + + // Wait for all tasks to complete. + debug!("waiting for {} tasks to finish", close_tx.receiver_count()); + close_tx.closed().await; } -// use axum::{response::Html, routing::get, Router}; -// use std::net::SocketAddr; -// use tokio::signal; +async fn shutdown_signal() { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; -// #[tokio::main] -// async fn main() { -// // build our application with a route -// let app = Router::new().route("/", get(handler)); + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; -// // run it -// let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); -// println!("listening on {}", addr); -// hyper::Server::bind(&addr) -// .serve(app.into_make_service()) -// .with_graceful_shutdown(shutdown_signal()) -// .await -// .unwrap(); -// } + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); -// async fn handler() -> Html<&'static str> { -// Html("

Hello, World!

") -// } - -// async fn shutdown_signal() { -// let ctrl_c = async { -// signal::ctrl_c() -// .await -// .expect("failed to install Ctrl+C handler"); -// }; - -// #[cfg(unix)] -// let terminate = async { -// signal::unix::signal(signal::unix::SignalKind::terminate()) -// .expect("failed to install signal handler") -// .recv() -// .await; -// }; - -// #[cfg(not(unix))] -// let terminate = std::future::pending::<()>(); - -// tokio::select! { -// _ = ctrl_c => {}, -// _ = terminate => {}, -// } - -// println!("signal received, starting graceful shutdown"); -// } + tokio::select! { + _ = ctrl_c => {}, + _ = terminate => {}, + } +}