Revert "Remove buffer from BoxRoute (#270)" (#273)

This reverts commit 552d69e5d4.
This commit is contained in:
David Pedersen 2021-08-26 16:11:38 +02:00 committed by GitHub
parent 7b391d85c8
commit a0be328976
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 219 additions and 96 deletions

View file

@ -41,8 +41,6 @@ tower-layer = "0.3"
tower-http = { version = "0.1", features = ["add-extension", "map-response-body"] } tower-http = { version = "0.1", features = ["add-extension", "map-response-body"] }
sync_wrapper = "0.1.1" sync_wrapper = "0.1.1"
dyn-clone = "1.0"
# optional dependencies # optional dependencies
tokio-tungstenite = { optional = true, version = "0.15" } tokio-tungstenite = { optional = true, version = "0.15" }
sha-1 = { optional = true, version = "0.9.6" } sha-1 = { optional = true, version = "0.9.6" }

194
src/buffer.rs Normal file
View file

@ -0,0 +1,194 @@
use futures_util::ready;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::PollSemaphore;
use tower::ServiceExt;
use tower_service::Service;
/// A version of [`tower::buffer::Buffer`] which panicks on channel related errors, thus keeping
/// the error type of the service.
pub(crate) struct MpscBuffer<S, R>
where
S: Service<R>,
{
tx: mpsc::UnboundedSender<Msg<S, R>>,
semaphore: PollSemaphore,
permit: Option<OwnedSemaphorePermit>,
}
impl<S, R> Clone for MpscBuffer<S, R>
where
S: Service<R>,
{
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
semaphore: self.semaphore.clone(),
permit: None,
}
}
}
impl<S, R> MpscBuffer<S, R>
where
S: Service<R>,
{
pub(crate) fn new(svc: S) -> Self
where
S: Send + 'static,
R: Send + 'static,
S::Error: Send + 'static,
S::Future: Send + 'static,
{
let (tx, rx) = mpsc::unbounded_channel::<Msg<S, R>>();
let semaphore = PollSemaphore::new(Arc::new(Semaphore::new(1024)));
tokio::spawn(run_worker(svc, rx));
Self {
tx,
semaphore,
permit: None,
}
}
}
async fn run_worker<S, R>(mut svc: S, mut rx: mpsc::UnboundedReceiver<Msg<S, R>>)
where
S: Service<R>,
{
while let Some((req, reply_tx)) = rx.recv().await {
match svc.ready().await {
Ok(svc) => {
let future = svc.call(req);
let _ = reply_tx.send(WorkerReply::Future(future));
}
Err(err) => {
let _ = reply_tx.send(WorkerReply::Error(err));
}
}
}
}
type Msg<S, R> = (
R,
oneshot::Sender<WorkerReply<<S as Service<R>>::Future, <S as Service<R>>::Error>>,
);
enum WorkerReply<F, E> {
Future(F),
Error(E),
}
impl<S, R> Service<R> for MpscBuffer<S, R>
where
S: Service<R>,
{
type Response = S::Response;
type Error = S::Error;
type Future = ResponseFuture<S::Future, S::Error>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.permit.is_some() {
return Poll::Ready(Ok(()));
}
let permit = ready!(self.semaphore.poll_acquire(cx))
.expect("buffer semaphore closed. This is a bug in axum and should never happen. Please file an issue");
self.permit = Some(permit);
Poll::Ready(Ok(()))
}
fn call(&mut self, req: R) -> Self::Future {
let permit = self
.permit
.take()
.expect("semaphore permit missing. Did you forget to call `poll_ready`?");
let (reply_tx, reply_rx) = oneshot::channel::<WorkerReply<S::Future, S::Error>>();
self.tx.send((req, reply_tx)).unwrap_or_else(|_| {
panic!("buffer worker not running. This is a bug in axum and should never happen. Please file an issue")
});
ResponseFuture {
state: State::Channel { reply_rx },
permit,
}
}
}
pin_project! {
pub(crate) struct ResponseFuture<F, E> {
#[pin]
state: State<F, E>,
permit: OwnedSemaphorePermit,
}
}
pin_project! {
#[project = StateProj]
enum State<F, E> {
Channel { reply_rx: oneshot::Receiver<WorkerReply<F, E>> },
Future { #[pin] future: F },
}
}
impl<F, E, T> Future for ResponseFuture<F, E>
where
F: Future<Output = Result<T, E>>,
{
type Output = Result<T, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let mut this = self.as_mut().project();
let new_state = match this.state.as_mut().project() {
StateProj::Channel { reply_rx } => {
let msg = ready!(Pin::new(reply_rx).poll(cx))
.expect("buffer worker not running. This is a bug in axum and should never happen. Please file an issue");
match msg {
WorkerReply::Future(future) => State::Future { future },
WorkerReply::Error(err) => return Poll::Ready(Err(err)),
}
}
StateProj::Future { future } => {
return future.poll(cx);
}
};
this.state.set(new_state);
}
}
}
#[cfg(test)]
mod tests {
#[allow(unused_imports)]
use super::*;
use tower::ServiceExt;
#[tokio::test]
async fn test_buffer() {
let mut svc = MpscBuffer::new(tower::service_fn(handle));
let res = svc.ready().await.unwrap().call(42).await.unwrap();
assert_eq!(res, "foo");
}
async fn handle(req: i32) -> Result<&'static str, std::convert::Infallible> {
assert_eq!(req, 42);
Ok("foo")
}
}

View file

@ -1004,6 +1004,7 @@
#[macro_use] #[macro_use]
pub(crate) mod macros; pub(crate) mod macros;
mod buffer;
mod error; mod error;
mod json; mod json;
mod util; mod util;

View file

@ -1,6 +1,6 @@
//! Future types. //! Future types.
use crate::{body::BoxBody, routing::FromEmptyRouter, util::CloneBoxService, BoxError}; use crate::{body::BoxBody, buffer::MpscBuffer, routing::FromEmptyRouter, BoxError};
use futures_util::ready; use futures_util::ready;
use http::{Request, Response}; use http::{Request, Response};
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
@ -11,7 +11,10 @@ use std::{
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use tower::{util::Oneshot, ServiceExt}; use tower::{
util::{BoxService, Oneshot},
ServiceExt,
};
use tower_service::Service; use tower_service::Service;
pub use super::or::ResponseFuture as OrResponseFuture; pub use super::or::ResponseFuture as OrResponseFuture;
@ -30,7 +33,10 @@ pin_project! {
{ {
#[pin] #[pin]
pub(super) inner: Oneshot< pub(super) inner: Oneshot<
CloneBoxService<Request<B>, Response<BoxBody>, E>, MpscBuffer<
BoxService<Request<B>, Response<BoxBody>, E >,
Request<B>
>,
Request<B>, Request<B>,
>, >,
} }

View file

@ -3,12 +3,13 @@
use self::future::{BoxRouteFuture, EmptyRouterFuture, NestedFuture, RouteFuture}; use self::future::{BoxRouteFuture, EmptyRouterFuture, NestedFuture, RouteFuture};
use crate::{ use crate::{
body::{box_body, BoxBody}, body::{box_body, BoxBody},
buffer::MpscBuffer,
extract::{ extract::{
connect_info::{Connected, IntoMakeServiceWithConnectInfo}, connect_info::{Connected, IntoMakeServiceWithConnectInfo},
OriginalUri, OriginalUri,
}, },
service::HandleError, service::HandleError,
util::{ByteStr, CloneBoxService}, util::ByteStr,
BoxError, BoxError,
}; };
use bytes::Bytes; use bytes::Bytes;
@ -23,7 +24,10 @@ use std::{
sync::Arc, sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
}; };
use tower::{util::ServiceExt, ServiceBuilder}; use tower::{
util::{BoxService, ServiceExt},
ServiceBuilder,
};
use tower_http::map_response_body::MapResponseBodyLayer; use tower_http::map_response_body::MapResponseBodyLayer;
use tower_layer::Layer; use tower_layer::Layer;
use tower_service::Service; use tower_service::Service;
@ -252,7 +256,7 @@ impl<S> Router<S> {
/// routes. /// routes.
pub fn boxed<ReqBody, ResBody>(self) -> Router<BoxRoute<ReqBody, S::Error>> pub fn boxed<ReqBody, ResBody>(self) -> Router<BoxRoute<ReqBody, S::Error>>
where where
S: Service<Request<ReqBody>, Response = Response<ResBody>> + Clone + Send + 'static, S: Service<Request<ReqBody>, Response = Response<ResBody>> + Send + 'static,
S::Error: Into<BoxError> + Send, S::Error: Into<BoxError> + Send,
S::Future: Send, S::Future: Send,
ReqBody: Send + 'static, ReqBody: Send + 'static,
@ -261,8 +265,9 @@ impl<S> Router<S> {
{ {
self.map(|svc| { self.map(|svc| {
ServiceBuilder::new() ServiceBuilder::new()
.layer_fn(|inner| BoxRoute { inner }) .layer_fn(BoxRoute)
.layer_fn(CloneBoxService::new) .layer_fn(MpscBuffer::new)
.layer(BoxService::layer())
.layer(MapResponseBodyLayer::new(box_body)) .layer(MapResponseBodyLayer::new(box_body))
.service(svc) .service(svc)
}) })
@ -828,15 +833,13 @@ type Captures = Vec<(String, String)>;
/// A boxed route trait object. /// A boxed route trait object.
/// ///
/// See [`Router::boxed`] for more details. /// See [`Router::boxed`] for more details.
pub struct BoxRoute<B = crate::body::Body, E = Infallible> { pub struct BoxRoute<B = crate::body::Body, E = Infallible>(
inner: CloneBoxService<Request<B>, Response<BoxBody>, E>, MpscBuffer<BoxService<Request<B>, Response<BoxBody>, E>, Request<B>>,
} );
impl<B, E> Clone for BoxRoute<B, E> { impl<B, E> Clone for BoxRoute<B, E> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
BoxRoute { Self(self.0.clone())
inner: self.inner.clone(),
}
} }
} }
@ -862,7 +865,7 @@ where
#[inline] #[inline]
fn call(&mut self, req: Request<B>) -> Self::Future { fn call(&mut self, req: Request<B>) -> Self::Future {
BoxRouteFuture { BoxRouteFuture {
inner: self.inner.clone().oneshot(req), inner: self.0.clone().oneshot(req),
} }
} }
} }

View file

@ -2,10 +2,6 @@ use bytes::Bytes;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use std::ops::Deref; use std::ops::Deref;
mod clone_box_service;
pub(crate) use self::clone_box_service::CloneBoxService;
/// A string like type backed by `Bytes` making it cheap to clone. /// A string like type backed by `Bytes` making it cheap to clone.
#[derive(Clone, Debug, PartialEq, Eq, Hash)] #[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub(crate) struct ByteStr(Bytes); pub(crate) struct ByteStr(Bytes);

View file

@ -1,75 +0,0 @@
use futures_util::future::BoxFuture;
use std::future::Future;
use std::task::{Context, Poll};
use tower::ServiceExt;
use tower_service::Service;
/// A boxed Service that implements Clone
///
/// Could probably upstream this to tower
pub(crate) struct CloneBoxService<T, U, E> {
inner: Box<
dyn CloneService<T, Response = U, Error = E, Future = BoxFuture<'static, Result<U, E>>>
+ Send,
>,
}
impl<T, U, E> CloneBoxService<T, U, E> {
pub(crate) fn new<S>(inner: S) -> Self
where
S: Service<T, Response = U, Error = E> + Clone + Send + 'static,
S::Future: Send + 'static,
{
let inner = Box::new(inner.map_future(|f| Box::pin(f) as _));
Self { inner }
}
}
impl<T, U, E> Clone for CloneBoxService<T, U, E> {
fn clone(&self) -> Self {
Self {
inner: dyn_clone::clone_box(&*self.inner),
}
}
}
impl<T, U, E> Service<T> for CloneBoxService<T, U, E> {
type Response = U;
type Error = E;
type Future = BoxFuture<'static, Result<U, E>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
CloneService::poll_ready(&mut *self.inner, cx)
}
fn call(&mut self, req: T) -> Self::Future {
CloneService::call(&mut *self.inner, req)
}
}
trait CloneService<R>: dyn_clone::DynClone {
type Response;
type Error;
type Future: Future<Output = Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn call(&mut self, req: R) -> Self::Future;
}
impl<R, T> CloneService<R> for T
where
T: Service<R> + Clone,
{
type Response = T::Response;
type Error = T::Error;
type Future = T::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Service::poll_ready(self, cx)
}
fn call(&mut self, req: R) -> Self::Future {
Service::call(self, req)
}
}