mirror of
https://github.com/tokio-rs/axum.git
synced 2025-01-01 08:56:15 +01:00
Remove buffer from BoxRoute
(#270)
Boxing a service normally means using `tower::util::BoxService`. That doesn't implement `Clone` however so normally I had been combining it with `Buffer` to get that. But recently I discovered https://github.com/dtolnay/dyn-clone which makes it possible to clone trait objects. So this adds a new internal utility called `CloneBoxService` which replaces the previous `BoxService` + `Buffer` combo in `BoxRoute`. I'll investigate upstreaming that to tower. I think it makes sense there since box + clone is quite a common need.
This commit is contained in:
parent
20f6c3b509
commit
552d69e5d4
7 changed files with 96 additions and 219 deletions
|
@ -41,6 +41,8 @@ tower-layer = "0.3"
|
||||||
tower-http = { version = "0.1", features = ["add-extension", "map-response-body"] }
|
tower-http = { version = "0.1", features = ["add-extension", "map-response-body"] }
|
||||||
sync_wrapper = "0.1.1"
|
sync_wrapper = "0.1.1"
|
||||||
|
|
||||||
|
dyn-clone = "1.0"
|
||||||
|
|
||||||
# optional dependencies
|
# optional dependencies
|
||||||
tokio-tungstenite = { optional = true, version = "0.15" }
|
tokio-tungstenite = { optional = true, version = "0.15" }
|
||||||
sha-1 = { optional = true, version = "0.9.6" }
|
sha-1 = { optional = true, version = "0.9.6" }
|
||||||
|
|
194
src/buffer.rs
194
src/buffer.rs
|
@ -1,194 +0,0 @@
|
||||||
use futures_util::ready;
|
|
||||||
use pin_project_lite::pin_project;
|
|
||||||
use std::{
|
|
||||||
future::Future,
|
|
||||||
pin::Pin,
|
|
||||||
sync::Arc,
|
|
||||||
task::{Context, Poll},
|
|
||||||
};
|
|
||||||
use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, Semaphore};
|
|
||||||
use tokio_util::sync::PollSemaphore;
|
|
||||||
use tower::ServiceExt;
|
|
||||||
use tower_service::Service;
|
|
||||||
|
|
||||||
/// A version of [`tower::buffer::Buffer`] which panicks on channel related errors, thus keeping
|
|
||||||
/// the error type of the service.
|
|
||||||
pub(crate) struct MpscBuffer<S, R>
|
|
||||||
where
|
|
||||||
S: Service<R>,
|
|
||||||
{
|
|
||||||
tx: mpsc::UnboundedSender<Msg<S, R>>,
|
|
||||||
semaphore: PollSemaphore,
|
|
||||||
permit: Option<OwnedSemaphorePermit>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S, R> Clone for MpscBuffer<S, R>
|
|
||||||
where
|
|
||||||
S: Service<R>,
|
|
||||||
{
|
|
||||||
fn clone(&self) -> Self {
|
|
||||||
Self {
|
|
||||||
tx: self.tx.clone(),
|
|
||||||
semaphore: self.semaphore.clone(),
|
|
||||||
permit: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S, R> MpscBuffer<S, R>
|
|
||||||
where
|
|
||||||
S: Service<R>,
|
|
||||||
{
|
|
||||||
pub(crate) fn new(svc: S) -> Self
|
|
||||||
where
|
|
||||||
S: Send + 'static,
|
|
||||||
R: Send + 'static,
|
|
||||||
S::Error: Send + 'static,
|
|
||||||
S::Future: Send + 'static,
|
|
||||||
{
|
|
||||||
let (tx, rx) = mpsc::unbounded_channel::<Msg<S, R>>();
|
|
||||||
let semaphore = PollSemaphore::new(Arc::new(Semaphore::new(1024)));
|
|
||||||
|
|
||||||
tokio::spawn(run_worker(svc, rx));
|
|
||||||
|
|
||||||
Self {
|
|
||||||
tx,
|
|
||||||
semaphore,
|
|
||||||
permit: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn run_worker<S, R>(mut svc: S, mut rx: mpsc::UnboundedReceiver<Msg<S, R>>)
|
|
||||||
where
|
|
||||||
S: Service<R>,
|
|
||||||
{
|
|
||||||
while let Some((req, reply_tx)) = rx.recv().await {
|
|
||||||
match svc.ready().await {
|
|
||||||
Ok(svc) => {
|
|
||||||
let future = svc.call(req);
|
|
||||||
let _ = reply_tx.send(WorkerReply::Future(future));
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
let _ = reply_tx.send(WorkerReply::Error(err));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type Msg<S, R> = (
|
|
||||||
R,
|
|
||||||
oneshot::Sender<WorkerReply<<S as Service<R>>::Future, <S as Service<R>>::Error>>,
|
|
||||||
);
|
|
||||||
|
|
||||||
enum WorkerReply<F, E> {
|
|
||||||
Future(F),
|
|
||||||
Error(E),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S, R> Service<R> for MpscBuffer<S, R>
|
|
||||||
where
|
|
||||||
S: Service<R>,
|
|
||||||
{
|
|
||||||
type Response = S::Response;
|
|
||||||
type Error = S::Error;
|
|
||||||
type Future = ResponseFuture<S::Future, S::Error>;
|
|
||||||
|
|
||||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
|
||||||
if self.permit.is_some() {
|
|
||||||
return Poll::Ready(Ok(()));
|
|
||||||
}
|
|
||||||
|
|
||||||
let permit = ready!(self.semaphore.poll_acquire(cx))
|
|
||||||
.expect("buffer semaphore closed. This is a bug in axum and should never happen. Please file an issue");
|
|
||||||
|
|
||||||
self.permit = Some(permit);
|
|
||||||
|
|
||||||
Poll::Ready(Ok(()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn call(&mut self, req: R) -> Self::Future {
|
|
||||||
let permit = self
|
|
||||||
.permit
|
|
||||||
.take()
|
|
||||||
.expect("semaphore permit missing. Did you forget to call `poll_ready`?");
|
|
||||||
|
|
||||||
let (reply_tx, reply_rx) = oneshot::channel::<WorkerReply<S::Future, S::Error>>();
|
|
||||||
|
|
||||||
self.tx.send((req, reply_tx)).unwrap_or_else(|_| {
|
|
||||||
panic!("buffer worker not running. This is a bug in axum and should never happen. Please file an issue")
|
|
||||||
});
|
|
||||||
|
|
||||||
ResponseFuture {
|
|
||||||
state: State::Channel { reply_rx },
|
|
||||||
permit,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pin_project! {
|
|
||||||
pub(crate) struct ResponseFuture<F, E> {
|
|
||||||
#[pin]
|
|
||||||
state: State<F, E>,
|
|
||||||
permit: OwnedSemaphorePermit,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pin_project! {
|
|
||||||
#[project = StateProj]
|
|
||||||
enum State<F, E> {
|
|
||||||
Channel { reply_rx: oneshot::Receiver<WorkerReply<F, E>> },
|
|
||||||
Future { #[pin] future: F },
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<F, E, T> Future for ResponseFuture<F, E>
|
|
||||||
where
|
|
||||||
F: Future<Output = Result<T, E>>,
|
|
||||||
{
|
|
||||||
type Output = Result<T, E>;
|
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
||||||
loop {
|
|
||||||
let mut this = self.as_mut().project();
|
|
||||||
|
|
||||||
let new_state = match this.state.as_mut().project() {
|
|
||||||
StateProj::Channel { reply_rx } => {
|
|
||||||
let msg = ready!(Pin::new(reply_rx).poll(cx))
|
|
||||||
.expect("buffer worker not running. This is a bug in axum and should never happen. Please file an issue");
|
|
||||||
|
|
||||||
match msg {
|
|
||||||
WorkerReply::Future(future) => State::Future { future },
|
|
||||||
WorkerReply::Error(err) => return Poll::Ready(Err(err)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
StateProj::Future { future } => {
|
|
||||||
return future.poll(cx);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
this.state.set(new_state);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
#[allow(unused_imports)]
|
|
||||||
use super::*;
|
|
||||||
use tower::ServiceExt;
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_buffer() {
|
|
||||||
let mut svc = MpscBuffer::new(tower::service_fn(handle));
|
|
||||||
|
|
||||||
let res = svc.ready().await.unwrap().call(42).await.unwrap();
|
|
||||||
|
|
||||||
assert_eq!(res, "foo");
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle(req: i32) -> Result<&'static str, std::convert::Infallible> {
|
|
||||||
assert_eq!(req, 42);
|
|
||||||
Ok("foo")
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1004,7 +1004,6 @@
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
pub(crate) mod macros;
|
pub(crate) mod macros;
|
||||||
|
|
||||||
mod buffer;
|
|
||||||
mod error;
|
mod error;
|
||||||
mod json;
|
mod json;
|
||||||
mod util;
|
mod util;
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
//! Future types.
|
//! Future types.
|
||||||
|
|
||||||
use crate::{body::BoxBody, buffer::MpscBuffer, routing::FromEmptyRouter, BoxError};
|
use crate::{body::BoxBody, routing::FromEmptyRouter, util::CloneBoxService, BoxError};
|
||||||
use futures_util::ready;
|
use futures_util::ready;
|
||||||
use http::{Request, Response};
|
use http::{Request, Response};
|
||||||
use pin_project_lite::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
|
@ -11,10 +11,7 @@ use std::{
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
use tower::{
|
use tower::{util::Oneshot, ServiceExt};
|
||||||
util::{BoxService, Oneshot},
|
|
||||||
ServiceExt,
|
|
||||||
};
|
|
||||||
use tower_service::Service;
|
use tower_service::Service;
|
||||||
|
|
||||||
pub use super::or::ResponseFuture as OrResponseFuture;
|
pub use super::or::ResponseFuture as OrResponseFuture;
|
||||||
|
@ -33,10 +30,7 @@ pin_project! {
|
||||||
{
|
{
|
||||||
#[pin]
|
#[pin]
|
||||||
pub(super) inner: Oneshot<
|
pub(super) inner: Oneshot<
|
||||||
MpscBuffer<
|
CloneBoxService<Request<B>, Response<BoxBody>, E>,
|
||||||
BoxService<Request<B>, Response<BoxBody>, E >,
|
|
||||||
Request<B>
|
|
||||||
>,
|
|
||||||
Request<B>,
|
Request<B>,
|
||||||
>,
|
>,
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,13 +3,12 @@
|
||||||
use self::future::{BoxRouteFuture, EmptyRouterFuture, NestedFuture, RouteFuture};
|
use self::future::{BoxRouteFuture, EmptyRouterFuture, NestedFuture, RouteFuture};
|
||||||
use crate::{
|
use crate::{
|
||||||
body::{box_body, BoxBody},
|
body::{box_body, BoxBody},
|
||||||
buffer::MpscBuffer,
|
|
||||||
extract::{
|
extract::{
|
||||||
connect_info::{Connected, IntoMakeServiceWithConnectInfo},
|
connect_info::{Connected, IntoMakeServiceWithConnectInfo},
|
||||||
OriginalUri,
|
OriginalUri,
|
||||||
},
|
},
|
||||||
service::HandleError,
|
service::HandleError,
|
||||||
util::ByteStr,
|
util::{ByteStr, CloneBoxService},
|
||||||
BoxError,
|
BoxError,
|
||||||
};
|
};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
@ -24,10 +23,7 @@ use std::{
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
use tower::{
|
use tower::{util::ServiceExt, ServiceBuilder};
|
||||||
util::{BoxService, ServiceExt},
|
|
||||||
ServiceBuilder,
|
|
||||||
};
|
|
||||||
use tower_http::map_response_body::MapResponseBodyLayer;
|
use tower_http::map_response_body::MapResponseBodyLayer;
|
||||||
use tower_layer::Layer;
|
use tower_layer::Layer;
|
||||||
use tower_service::Service;
|
use tower_service::Service;
|
||||||
|
@ -256,7 +252,7 @@ impl<S> Router<S> {
|
||||||
/// routes.
|
/// routes.
|
||||||
pub fn boxed<ReqBody, ResBody>(self) -> Router<BoxRoute<ReqBody, S::Error>>
|
pub fn boxed<ReqBody, ResBody>(self) -> Router<BoxRoute<ReqBody, S::Error>>
|
||||||
where
|
where
|
||||||
S: Service<Request<ReqBody>, Response = Response<ResBody>> + Send + 'static,
|
S: Service<Request<ReqBody>, Response = Response<ResBody>> + Clone + Send + 'static,
|
||||||
S::Error: Into<BoxError> + Send,
|
S::Error: Into<BoxError> + Send,
|
||||||
S::Future: Send,
|
S::Future: Send,
|
||||||
ReqBody: Send + 'static,
|
ReqBody: Send + 'static,
|
||||||
|
@ -265,9 +261,8 @@ impl<S> Router<S> {
|
||||||
{
|
{
|
||||||
self.map(|svc| {
|
self.map(|svc| {
|
||||||
ServiceBuilder::new()
|
ServiceBuilder::new()
|
||||||
.layer_fn(BoxRoute)
|
.layer_fn(|inner| BoxRoute { inner })
|
||||||
.layer_fn(MpscBuffer::new)
|
.layer_fn(CloneBoxService::new)
|
||||||
.layer(BoxService::layer())
|
|
||||||
.layer(MapResponseBodyLayer::new(box_body))
|
.layer(MapResponseBodyLayer::new(box_body))
|
||||||
.service(svc)
|
.service(svc)
|
||||||
})
|
})
|
||||||
|
@ -833,13 +828,15 @@ type Captures = Vec<(String, String)>;
|
||||||
/// A boxed route trait object.
|
/// A boxed route trait object.
|
||||||
///
|
///
|
||||||
/// See [`Router::boxed`] for more details.
|
/// See [`Router::boxed`] for more details.
|
||||||
pub struct BoxRoute<B = crate::body::Body, E = Infallible>(
|
pub struct BoxRoute<B = crate::body::Body, E = Infallible> {
|
||||||
MpscBuffer<BoxService<Request<B>, Response<BoxBody>, E>, Request<B>>,
|
inner: CloneBoxService<Request<B>, Response<BoxBody>, E>,
|
||||||
);
|
}
|
||||||
|
|
||||||
impl<B, E> Clone for BoxRoute<B, E> {
|
impl<B, E> Clone for BoxRoute<B, E> {
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
Self(self.0.clone())
|
BoxRoute {
|
||||||
|
inner: self.inner.clone(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -865,7 +862,7 @@ where
|
||||||
#[inline]
|
#[inline]
|
||||||
fn call(&mut self, req: Request<B>) -> Self::Future {
|
fn call(&mut self, req: Request<B>) -> Self::Future {
|
||||||
BoxRouteFuture {
|
BoxRouteFuture {
|
||||||
inner: self.0.clone().oneshot(req),
|
inner: self.inner.clone().oneshot(req),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
75
src/util/clone_box_service.rs
Normal file
75
src/util/clone_box_service.rs
Normal file
|
@ -0,0 +1,75 @@
|
||||||
|
use futures_util::future::BoxFuture;
|
||||||
|
use std::future::Future;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use tower::ServiceExt;
|
||||||
|
use tower_service::Service;
|
||||||
|
|
||||||
|
/// A boxed Service that implements Clone
|
||||||
|
///
|
||||||
|
/// Could probably upstream this to tower
|
||||||
|
pub(crate) struct CloneBoxService<T, U, E> {
|
||||||
|
inner: Box<
|
||||||
|
dyn CloneService<T, Response = U, Error = E, Future = BoxFuture<'static, Result<U, E>>>
|
||||||
|
+ Send,
|
||||||
|
>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, U, E> CloneBoxService<T, U, E> {
|
||||||
|
pub(crate) fn new<S>(inner: S) -> Self
|
||||||
|
where
|
||||||
|
S: Service<T, Response = U, Error = E> + Clone + Send + 'static,
|
||||||
|
S::Future: Send + 'static,
|
||||||
|
{
|
||||||
|
let inner = Box::new(inner.map_future(|f| Box::pin(f) as _));
|
||||||
|
Self { inner }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, U, E> Clone for CloneBoxService<T, U, E> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: dyn_clone::clone_box(&*self.inner),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, U, E> Service<T> for CloneBoxService<T, U, E> {
|
||||||
|
type Response = U;
|
||||||
|
type Error = E;
|
||||||
|
type Future = BoxFuture<'static, Result<U, E>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
CloneService::poll_ready(&mut *self.inner, cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: T) -> Self::Future {
|
||||||
|
CloneService::call(&mut *self.inner, req)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
trait CloneService<R>: dyn_clone::DynClone {
|
||||||
|
type Response;
|
||||||
|
type Error;
|
||||||
|
type Future: Future<Output = Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
|
||||||
|
|
||||||
|
fn call(&mut self, req: R) -> Self::Future;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R, T> CloneService<R> for T
|
||||||
|
where
|
||||||
|
T: Service<R> + Clone,
|
||||||
|
{
|
||||||
|
type Response = T::Response;
|
||||||
|
type Error = T::Error;
|
||||||
|
type Future = T::Future;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
Service::poll_ready(self, cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, req: R) -> Self::Future {
|
||||||
|
Service::call(self, req)
|
||||||
|
}
|
||||||
|
}
|
|
@ -2,6 +2,10 @@ use bytes::Bytes;
|
||||||
use pin_project_lite::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
|
|
||||||
|
mod clone_box_service;
|
||||||
|
|
||||||
|
pub(crate) use self::clone_box_service::CloneBoxService;
|
||||||
|
|
||||||
/// A string like type backed by `Bytes` making it cheap to clone.
|
/// A string like type backed by `Bytes` making it cheap to clone.
|
||||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||||
pub(crate) struct ByteStr(Bytes);
|
pub(crate) struct ByteStr(Bytes);
|
Loading…
Reference in a new issue