Example usage:
```rust
use axum::{prelude::*, sse::{sse, Event, KeepAlive}};
use tokio_stream::StreamExt as _;
use futures::stream::{self, Stream};
use std::{
time::Duration,
convert::Infallible,
};
let app = route("/sse", sse(make_stream).keep_alive(KeepAlive::default()));
async fn make_stream(
// you can also put extractors here
) -> Result<impl Stream<Item = Result<Event, Infallible>>, Infallible> {
// A `Stream` that repeats an event every second
let stream = stream::repeat_with(|| Event::default().data("hi!"))
.map(Ok)
.throttle(Duration::from_secs(1));
Ok(stream)
}
```
Implementation is based on [warp's](https://github.com/seanmonstar/warp/blob/master/src/filters/sse.rs)
Fixes https://github.com/tokio-rs/axum/issues/43
With this you can get the remote address like so:
```rust
use axum::{prelude::*, extract::ConnectInfo};
use std::net::SocketAddr;
let app = route("/", get(handler));
async fn handler(ConnectInfo(addr): ConnectInfo<SocketAddr>) -> String {
format!("Hello {}", addr)
}
// Starting the app with `into_make_service_with_connect_info` is required
// for `ConnectInfo` to work.
let make_svc = app.into_make_service_with_connect_info::<SocketAddr, _>();
hyper::Server::bind(&"0.0.0.0:3000".parse().unwrap())
.serve(make_svc)
.await
.expect("server failed");
```
This API is fully generic and supports whatever transport layer you're using with Hyper. I've updated the unix domain socket example to extract `peer_creds` and `peer_addr`.
Not actually related to Axum, can be implemented directly with Hyper, but I figure its nice to have for demonstration and might help catch accidental breaking changes in the future.
Previously extractors worked directly on `Request<B>` which meant you
had to do weird tricks like `mem::take(req.headers_mut())` to get owned
parts of the request.
This changes that instead to use a new `RequestParts` type that have
methods to "take" each part of the request. Without having to do weird
tricks.
Also removed the need to have `B: Default` for body extractors.
This changes error model to actually allow errors. I think if we're going to use this for things like tonic's route we need a more flexible error handling model. The same `handle_error` adaptors are still there but services aren't required to have `Infallible` as their error type. The error type is simply propagated all the way through.
Previously, when routing between one or two requests the two body types
would be merged by boxing them. This isn't ideal since it introduces a
layer indirection for each route.
We can't require the services to be routed between as not all services
use the same body type.
This changes that so it instead uses an `Either` enum that implements
`http_body::Body` if each variant does. Will reduce the overall
allocations and hopefully the compiler can optimize things if both
variants are the same.
Basically a copy/paste of whats in warp.
Example usage:
```rust
use tower_web::{prelude::*, ws::{ws, WebSocket}};
let app = route("/ws", ws(handle_socket));
async fn handle_socket(mut socket: WebSocket) {
while let Some(msg) = socket.recv().await {
let msg = msg.unwrap();
socket.send(msg).await.unwrap();
}
}
```