Add StreamBody (#237)

This adds `StreamBody` which converts a `Stream` of `Bytes` into a `http_body::Body`.

---

As suggested by Kestrer on Discord it would make sense for axum to provide different kinds of body types other than `Empty`, `Full`, and `hyper::Body`. There is also some talk about [splitting up `hyper::Body`](https://github.com/hyperium/hyper/issues/2345) so this can be seen as getting started on that effort. axum's body types could be moved to hyper or http-body if thats the direction we decide on.

The types I'm thinking about adding are:

- `StreamBody`-  added in this PR
- `AsyncReadBody` - similar to [http-body#41](https://github.com/hyperium/http-body/pull/41/files)
- `ChannelBody` - similar to `hyper::Body::channel`
This commit is contained in:
David Pedersen 2021-08-22 14:41:51 +02:00 committed by GitHub
parent 5ae94b6a24
commit 2322d39800
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 110 additions and 0 deletions

View file

@ -48,6 +48,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Responses:
- **added:** Add `Headers` for easily customizing headers on a response ([#193](https://github.com/tokio-rs/axum/pull/193))
- **added:** Add `Redirect` response ([#192](https://github.com/tokio-rs/axum/pull/192))
- **added:** Add `body::StreamBody` for easily responding with a stream of byte chunks ([#237](https://github.com/tokio-rs/axum/pull/237))
- **changed:** Add associated `Body` and `BodyError` types to `IntoResponse`. This is
required for returning responses with bodies other than `hyper::Body` from
handlers. See the docs for advice on how to implement `IntoResponse` ([#86](https://github.com/tokio-rs/axum/pull/86))

View file

@ -3,6 +3,10 @@
use crate::BoxError;
use crate::Error;
mod stream_body;
pub use self::stream_body::StreamBody;
#[doc(no_inline)]
pub use http_body::{Body as HttpBody, Empty, Full};

103
src/body/stream_body.rs Normal file
View file

@ -0,0 +1,103 @@
use crate::{BoxError, Error};
use bytes::Bytes;
use futures_util::stream::{self, Stream, TryStreamExt};
use http::HeaderMap;
use http_body::Body;
use std::convert::Infallible;
use std::{
fmt,
pin::Pin,
task::{Context, Poll},
};
use sync_wrapper::SyncWrapper;
/// An [`http_body::Body`] created from a [`Stream`].
///
/// # Example
///
/// ```
/// use axum::{
/// Router,
/// handler::get,
/// body::StreamBody,
/// };
/// use futures::stream;
///
/// async fn handler() -> StreamBody {
/// let chunks: Vec<Result<_, std::io::Error>> = vec![
/// Ok("Hello,"),
/// Ok(" "),
/// Ok("world!"),
/// ];
/// let stream = stream::iter(chunks);
/// StreamBody::new(stream)
/// }
///
/// let app = Router::new().route("/", get(handler));
/// # async {
/// # axum::Server::bind(&"".parse().unwrap()).serve(app.into_make_service()).await.unwrap();
/// # };
/// ```
///
/// [`Stream`]: futures_util::stream::Stream
// this should probably be extracted to `http_body`, eventually...
pub struct StreamBody {
stream: SyncWrapper<Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>>,
}
impl StreamBody {
/// Create a new `StreamBody` from a [`Stream`].
///
/// [`Stream`]: futures_util::stream::Stream
pub fn new<S, T, E>(stream: S) -> Self
where
S: Stream<Item = Result<T, E>> + Send + 'static,
T: Into<Bytes> + 'static,
E: Into<BoxError> + 'static,
{
let stream = stream
.map_ok(Into::into)
.map_err(|err| Error::new(err.into()));
Self {
stream: SyncWrapper::new(Box::pin(stream)),
}
}
}
impl Default for StreamBody {
fn default() -> Self {
Self::new(stream::empty::<Result<Bytes, Infallible>>())
}
}
impl fmt::Debug for StreamBody {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("StreamBody").finish()
}
}
impl Body for StreamBody {
type Data = Bytes;
type Error = Error;
fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
Pin::new(self.stream.get_mut()).poll_next(cx)
}
fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}
#[test]
fn stream_body_traits() {
crate::tests::assert_send::<StreamBody>();
crate::tests::assert_sync::<StreamBody>();
crate::tests::assert_unpin::<StreamBody>();
}

View file

@ -198,6 +198,7 @@ macro_rules! impl_into_response_for_body {
impl_into_response_for_body!(hyper::Body);
impl_into_response_for_body!(Full<Bytes>);
impl_into_response_for_body!(Empty<Bytes>);
impl_into_response_for_body!(crate::body::StreamBody);
impl<E> IntoResponse for http_body::combinators::BoxBody<Bytes, E>
where

View file

@ -701,3 +701,4 @@ where
pub(crate) fn assert_send<T: Send>() {}
pub(crate) fn assert_sync<T: Sync>() {}
pub(crate) fn assert_unpin<T: Unpin>() {}