Remove generic parameter from BodyStream (#234)

I think `BodyStream` is more useful without being generic over the
request body.

I'm also looking into adding a response body from a stream called
`StreamBody` which will work pretty much opposite to this.
This commit is contained in:
David Pedersen 2021-08-22 11:33:38 +02:00 committed by GitHub
parent add3dc36f9
commit 80a8355eff
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 35 additions and 14 deletions

View file

@ -51,6 +51,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
`tower::make::Shared` ([#229](https://github.com/tokio-rs/axum/pull/229))
- All usage of `tower::BoxError` has been replaced with `axum::BoxError` ([#229](https://github.com/tokio-rs/axum/pull/229))
- `tower::util::Either` no longer implements `IntoResponse` ([#229](https://github.com/tokio-rs/axum/pull/229))
- `extract::BodyStream` is no longer generic over the request body ([#234](https://github.com/tokio-rs/axum/pull/234))
- `extract::Body` has been renamed to `extract::RawBody` to avoid conflicting
with `body::Body`
- These future types have been moved

View file

@ -259,7 +259,7 @@
//! .route(
//! "/body-stream",
//! // same for `extract::BodyStream`
//! get(|_: extract::BodyStream<MyBody<Body>>| async {}),
//! get(|_: extract::BodyStream| async {}),
//! )
//! .route(
//! // and `Request<_>`

View file

@ -1,14 +1,17 @@
use super::{rejection::*, take_body, Extension, FromRequest, RequestParts};
use crate::BoxError;
use crate::{BoxError, Error};
use async_trait::async_trait;
use bytes::Bytes;
use futures_util::stream::Stream;
use http::{Extensions, HeaderMap, Method, Request, Uri, Version};
use http_body::Body as HttpBody;
use std::{
convert::Infallible,
fmt,
pin::Pin,
task::{Context, Poll},
};
use sync_wrapper::SyncWrapper;
#[async_trait]
impl<B> FromRequest<B> for Request<B>
@ -191,34 +194,48 @@ where
/// ```
///
/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
#[derive(Debug)]
pub struct BodyStream<B = crate::body::Body>(B);
pub struct BodyStream(
SyncWrapper<Pin<Box<dyn http_body::Body<Data = Bytes, Error = Error> + Send + 'static>>>,
);
impl<B> Stream for BodyStream<B>
where
B: http_body::Body + Unpin,
{
type Item = Result<B::Data, B::Error>;
impl Stream for BodyStream {
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.0).poll_data(cx)
Pin::new(self.0.get_mut()).poll_data(cx)
}
}
#[async_trait]
impl<B> FromRequest<B> for BodyStream<B>
impl<B> FromRequest<B> for BodyStream
where
B: http_body::Body + Unpin + Send,
B: HttpBody + Send + 'static,
B::Data: Into<Bytes>,
B::Error: Into<BoxError>,
{
type Rejection = BodyAlreadyExtracted;
async fn from_request(req: &mut RequestParts<B>) -> Result<Self, Self::Rejection> {
let body = take_body(req)?;
let stream = BodyStream(body);
let body = take_body(req)?
.map_data(Into::into)
.map_err(|err| Error::new(err.into()));
let stream = BodyStream(SyncWrapper::new(Box::pin(body)));
Ok(stream)
}
}
impl fmt::Debug for BodyStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("BodyStream").finish()
}
}
#[test]
fn body_stream_traits() {
crate::tests::assert_send::<BodyStream>();
crate::tests::assert_sync::<BodyStream>();
}
/// Extractor that extracts the raw request body.
///
/// # Example

View file

@ -698,3 +698,6 @@ where
addr
}
pub(crate) fn assert_send<T: Send>() {}
pub(crate) fn assert_sync<T: Sync>() {}