mirror of
https://github.com/tokio-rs/axum.git
synced 2025-04-26 13:56:22 +02:00
Guarantee return value of serve
, Pt. 2 (#3166)
This commit is contained in:
parent
d07d129baf
commit
28c6be74ae
1 changed files with 102 additions and 76 deletions
|
@ -80,6 +80,12 @@ pub use self::listener::{Listener, ListenerExt, TapIo};
|
|||
/// See also [`HandlerWithoutStateExt::into_make_service_with_connect_info`] and
|
||||
/// [`HandlerService::into_make_service_with_connect_info`].
|
||||
///
|
||||
/// # Return Value
|
||||
///
|
||||
/// Although this future resolves to `io::Result<()>`, it will never actually complete or return an
|
||||
/// error. Errors on the TCP socket will be handled by sleeping for a short while (currently, one
|
||||
/// second).
|
||||
///
|
||||
/// [`Router`]: crate::Router
|
||||
/// [`Router::into_make_service_with_connect_info`]: crate::Router::into_make_service_with_connect_info
|
||||
/// [`MethodRouter`]: crate::routing::MethodRouter
|
||||
|
@ -137,6 +143,11 @@ where
|
|||
/// // ...
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// # Return Value
|
||||
///
|
||||
/// Similarly to [`serve`], although this future resolves to `io::Result<()>`, it will never
|
||||
/// error. It returns `Ok(())` only after the `signal` future completes.
|
||||
pub fn with_graceful_shutdown<F>(self, signal: F) -> WithGracefulShutdown<L, M, S, F>
|
||||
where
|
||||
F: Future<Output = ()> + Send + 'static,
|
||||
|
@ -255,6 +266,25 @@ where
|
|||
type IntoFuture = private::ServeFuture;
|
||||
|
||||
fn into_future(self) -> Self::IntoFuture {
|
||||
private::ServeFuture(Box::pin(async move {
|
||||
self.run().await;
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "tokio", any(feature = "http1", feature = "http2")))]
|
||||
impl<L, M, S, F> WithGracefulShutdown<L, M, S, F>
|
||||
where
|
||||
L: Listener,
|
||||
L::Addr: Debug,
|
||||
M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S> + Send + 'static,
|
||||
for<'a> <M as Service<IncomingStream<'a, L>>>::Future: Send,
|
||||
S: Service<Request, Response = Response, Error = Infallible> + Clone + Send + 'static,
|
||||
S::Future: Send,
|
||||
F: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
async fn run(self) {
|
||||
let Self {
|
||||
mut listener,
|
||||
mut make_service,
|
||||
|
@ -262,91 +292,87 @@ where
|
|||
_marker: _,
|
||||
} = self;
|
||||
|
||||
private::ServeFuture(Box::pin(async move {
|
||||
let (signal_tx, signal_rx) = watch::channel(());
|
||||
let signal_tx = Arc::new(signal_tx);
|
||||
let (signal_tx, signal_rx) = watch::channel(());
|
||||
let signal_tx = Arc::new(signal_tx);
|
||||
tokio::spawn(async move {
|
||||
signal.await;
|
||||
trace!("received graceful shutdown signal. Telling tasks to shutdown");
|
||||
drop(signal_rx);
|
||||
});
|
||||
|
||||
let (close_tx, close_rx) = watch::channel(());
|
||||
|
||||
loop {
|
||||
let (io, remote_addr) = tokio::select! {
|
||||
conn = listener.accept() => conn,
|
||||
_ = signal_tx.closed() => {
|
||||
trace!("signal received, not accepting new connections");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let io = TokioIo::new(io);
|
||||
|
||||
trace!("connection {remote_addr:?} accepted");
|
||||
|
||||
poll_fn(|cx| make_service.poll_ready(cx))
|
||||
.await
|
||||
.unwrap_or_else(|err| match err {});
|
||||
|
||||
let tower_service = make_service
|
||||
.call(IncomingStream {
|
||||
io: &io,
|
||||
remote_addr,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err| match err {})
|
||||
.map_request(|req: Request<Incoming>| req.map(Body::new));
|
||||
|
||||
let hyper_service = TowerToHyperService::new(tower_service);
|
||||
|
||||
let signal_tx = Arc::clone(&signal_tx);
|
||||
|
||||
let close_rx = close_rx.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
signal.await;
|
||||
trace!("received graceful shutdown signal. Telling tasks to shutdown");
|
||||
drop(signal_rx);
|
||||
});
|
||||
#[allow(unused_mut)]
|
||||
let mut builder = Builder::new(TokioExecutor::new());
|
||||
// CONNECT protocol needed for HTTP/2 websockets
|
||||
#[cfg(feature = "http2")]
|
||||
builder.http2().enable_connect_protocol();
|
||||
let conn = builder.serve_connection_with_upgrades(io, hyper_service);
|
||||
pin_mut!(conn);
|
||||
|
||||
let (close_tx, close_rx) = watch::channel(());
|
||||
let signal_closed = signal_tx.closed().fuse();
|
||||
pin_mut!(signal_closed);
|
||||
|
||||
loop {
|
||||
let (io, remote_addr) = tokio::select! {
|
||||
conn = listener.accept() => conn,
|
||||
_ = signal_tx.closed() => {
|
||||
trace!("signal received, not accepting new connections");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let io = TokioIo::new(io);
|
||||
|
||||
trace!("connection {remote_addr:?} accepted");
|
||||
|
||||
poll_fn(|cx| make_service.poll_ready(cx))
|
||||
.await
|
||||
.unwrap_or_else(|err| match err {});
|
||||
|
||||
let tower_service = make_service
|
||||
.call(IncomingStream {
|
||||
io: &io,
|
||||
remote_addr,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|err| match err {})
|
||||
.map_request(|req: Request<Incoming>| req.map(Body::new));
|
||||
|
||||
let hyper_service = TowerToHyperService::new(tower_service);
|
||||
|
||||
let signal_tx = Arc::clone(&signal_tx);
|
||||
|
||||
let close_rx = close_rx.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
#[allow(unused_mut)]
|
||||
let mut builder = Builder::new(TokioExecutor::new());
|
||||
// CONNECT protocol needed for HTTP/2 websockets
|
||||
#[cfg(feature = "http2")]
|
||||
builder.http2().enable_connect_protocol();
|
||||
let conn = builder.serve_connection_with_upgrades(io, hyper_service);
|
||||
pin_mut!(conn);
|
||||
|
||||
let signal_closed = signal_tx.closed().fuse();
|
||||
pin_mut!(signal_closed);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
result = conn.as_mut() => {
|
||||
if let Err(_err) = result {
|
||||
trace!("failed to serve connection: {_err:#}");
|
||||
}
|
||||
break;
|
||||
}
|
||||
_ = &mut signal_closed => {
|
||||
trace!("signal received in task, starting graceful shutdown");
|
||||
conn.as_mut().graceful_shutdown();
|
||||
loop {
|
||||
tokio::select! {
|
||||
result = conn.as_mut() => {
|
||||
if let Err(_err) = result {
|
||||
trace!("failed to serve connection: {_err:#}");
|
||||
}
|
||||
break;
|
||||
}
|
||||
_ = &mut signal_closed => {
|
||||
trace!("signal received in task, starting graceful shutdown");
|
||||
conn.as_mut().graceful_shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
drop(close_rx);
|
||||
});
|
||||
}
|
||||
drop(close_rx);
|
||||
});
|
||||
}
|
||||
|
||||
drop(close_rx);
|
||||
drop(listener);
|
||||
drop(close_rx);
|
||||
drop(listener);
|
||||
|
||||
trace!(
|
||||
"waiting for {} task(s) to finish",
|
||||
close_tx.receiver_count()
|
||||
);
|
||||
close_tx.closed().await;
|
||||
|
||||
Ok(())
|
||||
}))
|
||||
trace!(
|
||||
"waiting for {} task(s) to finish",
|
||||
close_tx.receiver_count()
|
||||
);
|
||||
close_tx.closed().await;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue