axum::body no longer pulls in axum::body::stream_body (#2245)

Signed-off-by: wayne warren <wayne.warren.s@gmail.com>
This commit is contained in:
wayne 2023-09-29 00:52:26 -05:00 committed by GitHub
parent c8cf147657
commit 930e2ab7c1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1,149 +0,0 @@
use crate::{
body::{Bytes, HttpBody},
response::{IntoResponse, Response},
BoxError, Error,
};
use axum_core::body::Body;
use futures_util::{
ready,
stream::{self, TryStream},
};
use http::HeaderMap;
use pin_project_lite::pin_project;
use std::{
fmt,
pin::Pin,
task::{Context, Poll},
};
use sync_wrapper::SyncWrapper;
pin_project! {
/// An [`http_body::Body`] created from a [`Stream`].
///
/// The purpose of this type is to be used in responses. If you want to
/// extract the request body as a stream consider using
/// [`Body`](crate::body::Body).
///
/// # Example
///
/// ```
/// use axum::{
/// Router,
/// routing::get,
/// body::StreamBody,
/// response::IntoResponse,
/// };
/// use futures_util::stream::{self, Stream};
/// use std::io;
///
/// async fn handler() -> StreamBody<impl Stream<Item = io::Result<&'static str>>> {
/// let chunks: Vec<io::Result<_>> = vec![
/// Ok("Hello,"),
/// Ok(" "),
/// Ok("world!"),
/// ];
/// let stream = stream::iter(chunks);
/// StreamBody::new(stream)
/// }
///
/// let app = Router::new().route("/", get(handler));
/// # let _: Router = app;
/// ```
///
/// [`Stream`]: futures_util::stream::Stream
#[must_use]
pub struct StreamBody<S> {
#[pin]
stream: SyncWrapper<S>,
}
}
impl<S> From<S> for StreamBody<S>
where
S: TryStream + Send + 'static,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
fn from(stream: S) -> Self {
Self::new(stream)
}
}
impl<S> StreamBody<S> {
/// Create a new `StreamBody` from a [`Stream`].
///
/// [`Stream`]: futures_util::stream::Stream
pub fn new(stream: S) -> Self
where
S: TryStream + Send + 'static,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
Self {
stream: SyncWrapper::new(stream),
}
}
}
impl<S> IntoResponse for StreamBody<S>
where
S: TryStream + Send + 'static,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
fn into_response(self) -> Response {
Response::new(Body::new(self))
}
}
impl Default for StreamBody<futures_util::stream::Empty<Result<Bytes, Error>>> {
fn default() -> Self {
Self::new(stream::empty())
}
}
impl<S> fmt::Debug for StreamBody<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("StreamBody").finish()
}
}
impl<S> HttpBody for StreamBody<S>
where
S: TryStream,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
type Data = Bytes;
type Error = Error;
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let stream = self.project().stream.get_pin_mut();
match ready!(stream.try_poll_next(cx)) {
Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk.into()))),
Some(Err(err)) => Poll::Ready(Some(Err(Error::new(err)))),
None => Poll::Ready(None),
}
}
fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}
#[test]
fn stream_body_traits() {
use futures_util::stream::Empty;
type EmptyStream = StreamBody<Empty<Result<Bytes, BoxError>>>;
crate::test_helpers::assert_send::<EmptyStream>();
crate::test_helpers::assert_sync::<EmptyStream>();
crate::test_helpers::assert_unpin::<EmptyStream>();
}