mirror of
https://github.com/tokio-rs/axum.git
synced 2025-03-13 19:27:53 +01:00
Update unix-domain-socket, listen-multiple-addrs, and testing examples (#2358)
This commit is contained in:
parent
db344fab3f
commit
e63cc49f55
6 changed files with 335 additions and 403 deletions
|
@ -7,4 +7,6 @@ publish = false
|
|||
[dependencies]
|
||||
axum = { path = "../../axum" }
|
||||
hyper = { version = "1.0.0", features = ["full"] }
|
||||
hyper-util = { version = "0.1", features = ["tokio", "server-auto", "http1"] }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tower = { version = "0.4", features = ["util"] }
|
||||
|
|
|
@ -1,72 +1,57 @@
|
|||
//! Showcases how listening on multiple addrs is possible by
|
||||
//! implementing Accept for a custom struct.
|
||||
//! Showcases how listening on multiple addrs is possible.
|
||||
//!
|
||||
//! This may be useful in cases where the platform does not
|
||||
//! listen on both IPv4 and IPv6 when the IPv6 catch-all listener is used (`::`),
|
||||
//! [like older versions of Windows.](https://docs.microsoft.com/en-us/windows/win32/winsock/dual-stack-sockets)
|
||||
|
||||
//! Showcases how listening on multiple addrs is possible by
|
||||
//! implementing Accept for a custom struct.
|
||||
//!
|
||||
//! This may be useful in cases where the platform does not
|
||||
//! listen on both IPv4 and IPv6 when the IPv6 catch-all listener is used (`::`),
|
||||
//! [like older versions of Windows.](https://docs.microsoft.com/en-us/windows/win32/winsock/dual-stack-sockets)
|
||||
use axum::{extract::Request, routing::get, Router};
|
||||
use hyper::body::Incoming;
|
||||
use hyper_util::{
|
||||
rt::{TokioExecutor, TokioIo},
|
||||
server,
|
||||
};
|
||||
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
|
||||
use tokio::net::TcpListener;
|
||||
use tower::Service;
|
||||
|
||||
// TODO
|
||||
fn main() {
|
||||
eprint!("this example has not yet been updated to hyper 1.0");
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let app: Router = Router::new().route("/", get(|| async { "Hello, World!" }));
|
||||
|
||||
let localhost_v4 = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8080);
|
||||
let listener_v4 = TcpListener::bind(&localhost_v4).await.unwrap();
|
||||
|
||||
let localhost_v6 = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 8080);
|
||||
let listener_v6 = TcpListener::bind(&localhost_v6).await.unwrap();
|
||||
|
||||
// See https://github.com/tokio-rs/axum/blob/main/examples/serve-with-hyper/src/main.rs for
|
||||
// more details about this setup
|
||||
loop {
|
||||
// Accept connections from `listener_v4` and `listener_v6` at the same time
|
||||
let (socket, _remote_addr) = tokio::select! {
|
||||
result = listener_v4.accept() => {
|
||||
result.unwrap()
|
||||
}
|
||||
result = listener_v6.accept() => {
|
||||
result.unwrap()
|
||||
}
|
||||
};
|
||||
|
||||
let tower_service = app.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let socket = TokioIo::new(socket);
|
||||
|
||||
let hyper_service = hyper::service::service_fn(move |request: Request<Incoming>| {
|
||||
tower_service.clone().call(request)
|
||||
});
|
||||
|
||||
if let Err(err) = server::conn::auto::Builder::new(TokioExecutor::new())
|
||||
.serve_connection_with_upgrades(socket, hyper_service)
|
||||
.await
|
||||
{
|
||||
eprintln!("failed to serve connection: {err:#}");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// use axum::{routing::get, Router};
|
||||
// use hyper::server::{accept::Accept, conn::AddrIncoming};
|
||||
// use std::{
|
||||
// net::{Ipv4Addr, Ipv6Addr, SocketAddr},
|
||||
// pin::Pin,
|
||||
// task::{Context, Poll},
|
||||
// };
|
||||
|
||||
// #[tokio::main]
|
||||
// async fn main() {
|
||||
// let app = Router::new().route("/", get(|| async { "Hello, World!" }));
|
||||
|
||||
// let localhost_v4 = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8080);
|
||||
// let incoming_v4 = AddrIncoming::bind(&localhost_v4).unwrap();
|
||||
|
||||
// let localhost_v6 = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 8080);
|
||||
// let incoming_v6 = AddrIncoming::bind(&localhost_v6).unwrap();
|
||||
|
||||
// let combined = CombinedIncoming {
|
||||
// a: incoming_v4,
|
||||
// b: incoming_v6,
|
||||
// };
|
||||
|
||||
// hyper::Server::builder(combined)
|
||||
// .serve(app.into_make_service())
|
||||
// .await
|
||||
// .unwrap();
|
||||
// }
|
||||
|
||||
// struct CombinedIncoming {
|
||||
// a: AddrIncoming,
|
||||
// b: AddrIncoming,
|
||||
// }
|
||||
|
||||
// impl Accept for CombinedIncoming {
|
||||
// type Conn = <AddrIncoming as Accept>::Conn;
|
||||
// type Error = <AddrIncoming as Accept>::Error;
|
||||
|
||||
// fn poll_accept(
|
||||
// mut self: Pin<&mut Self>,
|
||||
// cx: &mut Context<'_>,
|
||||
// ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
|
||||
// if let Poll::Ready(Some(value)) = Pin::new(&mut self.a).poll_accept(cx) {
|
||||
// return Poll::Ready(Some(value));
|
||||
// }
|
||||
|
||||
// if let Poll::Ready(Some(value)) = Pin::new(&mut self.b).poll_accept(cx) {
|
||||
// return Poll::Ready(Some(value));
|
||||
// }
|
||||
|
||||
// Poll::Pending
|
||||
// }
|
||||
// }
|
||||
|
|
|
@ -8,6 +8,7 @@ publish = false
|
|||
axum = { path = "../../axum" }
|
||||
http-body-util = "0.1.0"
|
||||
hyper = { version = "1.0.0", features = ["full"] }
|
||||
hyper-util = { version = "0.1", features = ["client", "http1", "client-legacy"] }
|
||||
mime = "0.3"
|
||||
serde_json = "1.0"
|
||||
tokio = { version = "1.0", features = ["full"] }
|
||||
|
|
|
@ -4,208 +4,198 @@
|
|||
//! cargo test -p example-testing
|
||||
//! ```
|
||||
|
||||
fn main() {
|
||||
// This example has not yet been updated to Hyper 1.0
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use axum::{
|
||||
extract::ConnectInfo,
|
||||
routing::{get, post},
|
||||
Json, Router,
|
||||
};
|
||||
use tower_http::trace::TraceLayer;
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
tracing_subscriber::registry()
|
||||
.with(
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| "example_testing=debug,tower_http=debug".into()),
|
||||
)
|
||||
.with(tracing_subscriber::fmt::layer())
|
||||
.init();
|
||||
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
|
||||
.await
|
||||
.unwrap();
|
||||
tracing::debug!("listening on {}", listener.local_addr().unwrap());
|
||||
axum::serve(listener, app()).await.unwrap();
|
||||
}
|
||||
|
||||
//use std::net::SocketAddr;
|
||||
/// Having a function that produces our app makes it easy to call it from tests
|
||||
/// without having to create an HTTP server.
|
||||
fn app() -> Router {
|
||||
Router::new()
|
||||
.route("/", get(|| async { "Hello, World!" }))
|
||||
.route(
|
||||
"/json",
|
||||
post(|payload: Json<serde_json::Value>| async move {
|
||||
Json(serde_json::json!({ "data": payload.0 }))
|
||||
}),
|
||||
)
|
||||
.route(
|
||||
"/requires-connect-into",
|
||||
get(|ConnectInfo(addr): ConnectInfo<SocketAddr>| async move { format!("Hi {addr}") }),
|
||||
)
|
||||
// We can still add middleware
|
||||
.layer(TraceLayer::new_for_http())
|
||||
}
|
||||
|
||||
//use axum::{
|
||||
// extract::ConnectInfo,
|
||||
// routing::{get, post},
|
||||
// Json, Router,
|
||||
//};
|
||||
//use tower_http::trace::TraceLayer;
|
||||
//use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use axum::{
|
||||
body::Body,
|
||||
extract::connect_info::MockConnectInfo,
|
||||
http::{self, Request, StatusCode},
|
||||
};
|
||||
use http_body_util::BodyExt;
|
||||
use serde_json::{json, Value};
|
||||
use std::net::SocketAddr;
|
||||
use tokio::net::TcpListener;
|
||||
use tower::Service; // for `call`
|
||||
use tower::ServiceExt; // for `oneshot` and `ready` // for `collect`
|
||||
|
||||
//#[tokio::main]
|
||||
//async fn main() {
|
||||
// tracing_subscriber::registry()
|
||||
// .with(
|
||||
// tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
// .unwrap_or_else(|_| "example_testing=debug,tower_http=debug".into()),
|
||||
// )
|
||||
// .with(tracing_subscriber::fmt::layer())
|
||||
// .init();
|
||||
#[tokio::test]
|
||||
async fn hello_world() {
|
||||
let app = app();
|
||||
|
||||
// let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
|
||||
// .await
|
||||
// .unwrap();
|
||||
// tracing::debug!("listening on {}", listener.local_addr().unwrap());
|
||||
// axum::serve(listener, app()).await.unwrap();
|
||||
//}
|
||||
// `Router` implements `tower::Service<Request<Body>>` so we can
|
||||
// call it like any tower service, no need to run an HTTP server.
|
||||
let response = app
|
||||
.oneshot(Request::builder().uri("/").body(Body::empty()).unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
///// Having a function that produces our app makes it easy to call it from tests
|
||||
///// without having to create an HTTP server.
|
||||
//fn app() -> Router {
|
||||
// Router::new()
|
||||
// .route("/", get(|| async { "Hello, World!" }))
|
||||
// .route(
|
||||
// "/json",
|
||||
// post(|payload: Json<serde_json::Value>| async move {
|
||||
// Json(serde_json::json!({ "data": payload.0 }))
|
||||
// }),
|
||||
// )
|
||||
// .route(
|
||||
// "/requires-connect-into",
|
||||
// get(|ConnectInfo(addr): ConnectInfo<SocketAddr>| async move { format!("Hi {addr}") }),
|
||||
// )
|
||||
// // We can still add middleware
|
||||
// .layer(TraceLayer::new_for_http())
|
||||
//}
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
|
||||
//#[cfg(test)]
|
||||
//mod tests {
|
||||
// use super::*;
|
||||
// use axum::{
|
||||
// body::Body,
|
||||
// extract::connect_info::MockConnectInfo,
|
||||
// http::{self, Request, StatusCode},
|
||||
// };
|
||||
// use http_body_util::BodyExt;
|
||||
// use serde_json::{json, Value};
|
||||
// use std::net::SocketAddr;
|
||||
// use tokio::net::{TcpListener, TcpStream};
|
||||
// use tower::Service; // for `call`
|
||||
// use tower::ServiceExt; // for `oneshot` and `ready` // for `collect`
|
||||
let body = response.into_body().collect().await.unwrap().to_bytes();
|
||||
assert_eq!(&body[..], b"Hello, World!");
|
||||
}
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn hello_world() {
|
||||
// let app = app();
|
||||
#[tokio::test]
|
||||
async fn json() {
|
||||
let app = app();
|
||||
|
||||
// // `Router` implements `tower::Service<Request<Body>>` so we can
|
||||
// // call it like any tower service, no need to run an HTTP server.
|
||||
// let response = app
|
||||
// .oneshot(Request::builder().uri("/").body(Body::empty()).unwrap())
|
||||
// .await
|
||||
// .unwrap();
|
||||
let response = app
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.method(http::Method::POST)
|
||||
.uri("/json")
|
||||
.header(http::header::CONTENT_TYPE, mime::APPLICATION_JSON.as_ref())
|
||||
.body(Body::from(
|
||||
serde_json::to_vec(&json!([1, 2, 3, 4])).unwrap(),
|
||||
))
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// assert_eq!(response.status(), StatusCode::OK);
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
|
||||
// let body = response.into_body().collect().await.unwrap().to_bytes();
|
||||
// assert_eq!(&body[..], b"Hello, World!");
|
||||
// }
|
||||
let body = response.into_body().collect().await.unwrap().to_bytes();
|
||||
let body: Value = serde_json::from_slice(&body).unwrap();
|
||||
assert_eq!(body, json!({ "data": [1, 2, 3, 4] }));
|
||||
}
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn json() {
|
||||
// let app = app();
|
||||
#[tokio::test]
|
||||
async fn not_found() {
|
||||
let app = app();
|
||||
|
||||
// let response = app
|
||||
// .oneshot(
|
||||
// Request::builder()
|
||||
// .method(http::Method::POST)
|
||||
// .uri("/json")
|
||||
// .header(http::header::CONTENT_TYPE, mime::APPLICATION_JSON.as_ref())
|
||||
// .body(Body::from(
|
||||
// serde_json::to_vec(&json!([1, 2, 3, 4])).unwrap(),
|
||||
// ))
|
||||
// .unwrap(),
|
||||
// )
|
||||
// .await
|
||||
// .unwrap();
|
||||
let response = app
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.uri("/does-not-exist")
|
||||
.body(Body::empty())
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// assert_eq!(response.status(), StatusCode::OK);
|
||||
assert_eq!(response.status(), StatusCode::NOT_FOUND);
|
||||
let body = response.into_body().collect().await.unwrap().to_bytes();
|
||||
assert!(body.is_empty());
|
||||
}
|
||||
|
||||
// let body = response.into_body().collect().await.unwrap().to_bytes();
|
||||
// let body: Value = serde_json::from_slice(&body).unwrap();
|
||||
// assert_eq!(body, json!({ "data": [1, 2, 3, 4] }));
|
||||
// }
|
||||
// You can also spawn a server and talk to it like any other HTTP server:
|
||||
#[tokio::test]
|
||||
async fn the_real_deal() {
|
||||
let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn not_found() {
|
||||
// let app = app();
|
||||
tokio::spawn(async move {
|
||||
axum::serve(listener, app()).await.unwrap();
|
||||
});
|
||||
|
||||
// let response = app
|
||||
// .oneshot(
|
||||
// Request::builder()
|
||||
// .uri("/does-not-exist")
|
||||
// .body(Body::empty())
|
||||
// .unwrap(),
|
||||
// )
|
||||
// .await
|
||||
// .unwrap();
|
||||
let client =
|
||||
hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
|
||||
.build_http();
|
||||
|
||||
// assert_eq!(response.status(), StatusCode::NOT_FOUND);
|
||||
// let body = response.into_body().collect().await.unwrap().to_bytes();
|
||||
// assert!(body.is_empty());
|
||||
// }
|
||||
let response = client
|
||||
.request(
|
||||
Request::builder()
|
||||
.uri(format!("http://{addr}"))
|
||||
.header("Host", "localhost")
|
||||
.body(Body::empty())
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// // You can also spawn a server and talk to it like any other HTTP server:
|
||||
// #[tokio::test]
|
||||
// async fn the_real_deal() {
|
||||
// // TODO(david): convert this to hyper-util when thats published
|
||||
let body = response.into_body().collect().await.unwrap().to_bytes();
|
||||
assert_eq!(&body[..], b"Hello, World!");
|
||||
}
|
||||
|
||||
// use hyper::client::conn;
|
||||
// You can use `ready()` and `call()` to avoid using `clone()`
|
||||
// in multiple request
|
||||
#[tokio::test]
|
||||
async fn multiple_request() {
|
||||
let mut app = app().into_service();
|
||||
|
||||
// let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
|
||||
// let addr = listener.local_addr().unwrap();
|
||||
let request = Request::builder().uri("/").body(Body::empty()).unwrap();
|
||||
let response = ServiceExt::<Request<Body>>::ready(&mut app)
|
||||
.await
|
||||
.unwrap()
|
||||
.call(request)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
|
||||
// tokio::spawn(async move {
|
||||
// axum::serve(listener, app()).await.unwrap();
|
||||
// });
|
||||
let request = Request::builder().uri("/").body(Body::empty()).unwrap();
|
||||
let response = ServiceExt::<Request<Body>>::ready(&mut app)
|
||||
.await
|
||||
.unwrap()
|
||||
.call(request)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
}
|
||||
|
||||
// let target_stream = TcpStream::connect(addr).await.unwrap();
|
||||
// Here we're calling `/requires-connect-into` which requires `ConnectInfo`
|
||||
//
|
||||
// That is normally set with `Router::into_make_service_with_connect_info` but we can't easily
|
||||
// use that during tests. The solution is instead to set the `MockConnectInfo` layer during
|
||||
// tests.
|
||||
#[tokio::test]
|
||||
async fn with_into_make_service_with_connect_info() {
|
||||
let mut app = app()
|
||||
.layer(MockConnectInfo(SocketAddr::from(([0, 0, 0, 0], 3000))))
|
||||
.into_service();
|
||||
|
||||
// let (mut request_sender, connection) = conn::http1::handshake(target_stream).await.unwrap();
|
||||
|
||||
// tokio::spawn(async move { connection.await.unwrap() });
|
||||
|
||||
// let response = request_sender
|
||||
// .send_request(
|
||||
// Request::builder()
|
||||
// .uri(format!("http://{addr}"))
|
||||
// .header("Host", "localhost")
|
||||
// .body(Body::empty())
|
||||
// .unwrap(),
|
||||
// )
|
||||
// .await
|
||||
// .unwrap();
|
||||
|
||||
// let body = response.into_body().collect().await.unwrap().to_bytes();
|
||||
// assert_eq!(&body[..], b"Hello, World!");
|
||||
// }
|
||||
|
||||
// // You can use `ready()` and `call()` to avoid using `clone()`
|
||||
// // in multiple request
|
||||
// #[tokio::test]
|
||||
// async fn multiple_request() {
|
||||
// let mut app = app().into_service();
|
||||
|
||||
// let request = Request::builder().uri("/").body(Body::empty()).unwrap();
|
||||
// let response = ServiceExt::<Request<Body>>::ready(&mut app)
|
||||
// .await
|
||||
// .unwrap()
|
||||
// .call(request)
|
||||
// .await
|
||||
// .unwrap();
|
||||
// assert_eq!(response.status(), StatusCode::OK);
|
||||
|
||||
// let request = Request::builder().uri("/").body(Body::empty()).unwrap();
|
||||
// let response = ServiceExt::<Request<Body>>::ready(&mut app)
|
||||
// .await
|
||||
// .unwrap()
|
||||
// .call(request)
|
||||
// .await
|
||||
// .unwrap();
|
||||
// assert_eq!(response.status(), StatusCode::OK);
|
||||
// }
|
||||
|
||||
// // Here we're calling `/requires-connect-into` which requires `ConnectInfo`
|
||||
// //
|
||||
// // That is normally set with `Router::into_make_service_with_connect_info` but we can't easily
|
||||
// // use that during tests. The solution is instead to set the `MockConnectInfo` layer during
|
||||
// // tests.
|
||||
// #[tokio::test]
|
||||
// async fn with_into_make_service_with_connect_info() {
|
||||
// let mut app = app()
|
||||
// .layer(MockConnectInfo(SocketAddr::from(([0, 0, 0, 0], 3000))))
|
||||
// .into_service();
|
||||
|
||||
// let request = Request::builder()
|
||||
// .uri("/requires-connect-into")
|
||||
// .body(Body::empty())
|
||||
// .unwrap();
|
||||
// let response = app.ready().await.unwrap().call(request).await.unwrap();
|
||||
// assert_eq!(response.status(), StatusCode::OK);
|
||||
// }
|
||||
//}
|
||||
let request = Request::builder()
|
||||
.uri("/requires-connect-into")
|
||||
.body(Body::empty())
|
||||
.unwrap();
|
||||
let response = app.ready().await.unwrap().call(request).await.unwrap();
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,8 +6,9 @@ publish = false
|
|||
|
||||
[dependencies]
|
||||
axum = { path = "../../axum" }
|
||||
futures = "0.3"
|
||||
http-body-util = "0.1"
|
||||
hyper = { version = "1.0.0", features = ["full"] }
|
||||
hyper-util = { version = "0.1", features = ["tokio", "server-auto", "http1"] }
|
||||
tokio = { version = "1.0", features = ["full"] }
|
||||
tower = { version = "0.4", features = ["util"] }
|
||||
tracing = "0.1"
|
||||
|
|
|
@ -4,183 +4,136 @@
|
|||
//! cargo run -p example-unix-domain-socket
|
||||
//! ```
|
||||
|
||||
// TODO
|
||||
fn main() {
|
||||
eprint!("this example has not yet been updated to hyper 1.0");
|
||||
#[cfg(unix)]
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
unix::server().await;
|
||||
}
|
||||
|
||||
// #[cfg(unix)]
|
||||
// #[tokio::main]
|
||||
// async fn main() {
|
||||
// unix::server().await;
|
||||
// }
|
||||
#[cfg(not(unix))]
|
||||
fn main() {
|
||||
println!("This example requires unix")
|
||||
}
|
||||
|
||||
// #[cfg(not(unix))]
|
||||
// fn main() {
|
||||
// println!("This example requires unix")
|
||||
// }
|
||||
#[cfg(unix)]
|
||||
mod unix {
|
||||
use axum::{
|
||||
body::Body,
|
||||
extract::connect_info::{self, ConnectInfo},
|
||||
http::{Method, Request, StatusCode},
|
||||
routing::get,
|
||||
Router,
|
||||
};
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::body::Incoming;
|
||||
use hyper_util::{
|
||||
rt::{TokioExecutor, TokioIo},
|
||||
server,
|
||||
};
|
||||
use std::{convert::Infallible, path::PathBuf, sync::Arc};
|
||||
use tokio::net::{unix::UCred, UnixListener, UnixStream};
|
||||
use tower::Service;
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
// #[cfg(unix)]
|
||||
// mod unix {
|
||||
// use axum::{
|
||||
// body::Body,
|
||||
// extract::connect_info::{self, ConnectInfo},
|
||||
// http::{Method, Request, StatusCode, Uri},
|
||||
// routing::get,
|
||||
// Router,
|
||||
// };
|
||||
// use futures::ready;
|
||||
// use hyper::{
|
||||
// client::connect::{Connected, Connection},
|
||||
// server::accept::Accept,
|
||||
// };
|
||||
// use std::{
|
||||
// io,
|
||||
// path::PathBuf,
|
||||
// pin::Pin,
|
||||
// sync::Arc,
|
||||
// task::{Context, Poll},
|
||||
// };
|
||||
// use tokio::{
|
||||
// io::{AsyncRead, AsyncWrite},
|
||||
// net::{unix::UCred, UnixListener, UnixStream},
|
||||
// };
|
||||
// use tower::BoxError;
|
||||
// use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
pub async fn server() {
|
||||
tracing_subscriber::registry()
|
||||
.with(
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| "debug".into()),
|
||||
)
|
||||
.with(tracing_subscriber::fmt::layer())
|
||||
.init();
|
||||
|
||||
// pub async fn server() {
|
||||
// tracing_subscriber::registry()
|
||||
// .with(
|
||||
// tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
// .unwrap_or_else(|_| "debug".into()),
|
||||
// )
|
||||
// .with(tracing_subscriber::fmt::layer())
|
||||
// .init();
|
||||
let path = PathBuf::from("/tmp/axum/helloworld");
|
||||
|
||||
// let path = PathBuf::from("/tmp/axum/helloworld");
|
||||
let _ = tokio::fs::remove_file(&path).await;
|
||||
tokio::fs::create_dir_all(path.parent().unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// let _ = tokio::fs::remove_file(&path).await;
|
||||
// tokio::fs::create_dir_all(path.parent().unwrap())
|
||||
// .await
|
||||
// .unwrap();
|
||||
let uds = UnixListener::bind(path.clone()).unwrap();
|
||||
tokio::spawn(async move {
|
||||
let app = Router::new().route("/", get(handler));
|
||||
|
||||
// let uds = UnixListener::bind(path.clone()).unwrap();
|
||||
// tokio::spawn(async {
|
||||
// let app = Router::new().route("/", get(handler));
|
||||
let mut make_service = app.into_make_service_with_connect_info::<UdsConnectInfo>();
|
||||
|
||||
// hyper::Server::builder(ServerAccept { uds })
|
||||
// .serve(app.into_make_service_with_connect_info::<UdsConnectInfo>())
|
||||
// .await
|
||||
// .unwrap();
|
||||
// });
|
||||
// See https://github.com/tokio-rs/axum/blob/main/examples/serve-with-hyper/src/main.rs for
|
||||
// more details about this setup
|
||||
loop {
|
||||
let (socket, _remote_addr) = uds.accept().await.unwrap();
|
||||
|
||||
// let connector = tower::service_fn(move |_: Uri| {
|
||||
// let path = path.clone();
|
||||
// Box::pin(async move {
|
||||
// let stream = UnixStream::connect(path).await?;
|
||||
// Ok::<_, io::Error>(ClientConnection { stream })
|
||||
// })
|
||||
// });
|
||||
// let client = hyper::Client::builder().build(connector);
|
||||
let tower_service = unwrap_infallible(make_service.call(&socket).await);
|
||||
|
||||
// let request = Request::builder()
|
||||
// .method(Method::GET)
|
||||
// .uri("http://uri-doesnt-matter.com")
|
||||
// .body(Body::empty())
|
||||
// .unwrap();
|
||||
tokio::spawn(async move {
|
||||
let socket = TokioIo::new(socket);
|
||||
|
||||
// let response = client.request(request).await.unwrap();
|
||||
let hyper_service =
|
||||
hyper::service::service_fn(move |request: Request<Incoming>| {
|
||||
tower_service.clone().call(request)
|
||||
});
|
||||
|
||||
// assert_eq!(response.status(), StatusCode::OK);
|
||||
if let Err(err) = server::conn::auto::Builder::new(TokioExecutor::new())
|
||||
.serve_connection_with_upgrades(socket, hyper_service)
|
||||
.await
|
||||
{
|
||||
eprintln!("failed to serve connection: {err:#}");
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
|
||||
// let body = String::from_utf8(body.to_vec()).unwrap();
|
||||
// assert_eq!(body, "Hello, World!");
|
||||
// }
|
||||
let stream = TokioIo::new(UnixStream::connect(path).await.unwrap());
|
||||
let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await.unwrap();
|
||||
tokio::task::spawn(async move {
|
||||
if let Err(err) = conn.await {
|
||||
println!("Connection failed: {:?}", err);
|
||||
}
|
||||
});
|
||||
|
||||
// async fn handler(ConnectInfo(info): ConnectInfo<UdsConnectInfo>) -> &'static str {
|
||||
// println!("new connection from `{:?}`", info);
|
||||
let request = Request::builder()
|
||||
.method(Method::GET)
|
||||
.uri("http://uri-doesnt-matter.com")
|
||||
.body(Body::empty())
|
||||
.unwrap();
|
||||
|
||||
// "Hello, World!"
|
||||
// }
|
||||
let response = sender.send_request(request).await.unwrap();
|
||||
|
||||
// struct ServerAccept {
|
||||
// uds: UnixListener,
|
||||
// }
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
|
||||
// impl Accept for ServerAccept {
|
||||
// type Conn = UnixStream;
|
||||
// type Error = BoxError;
|
||||
let body = response.collect().await.unwrap().to_bytes();
|
||||
let body = String::from_utf8(body.to_vec()).unwrap();
|
||||
assert_eq!(body, "Hello, World!");
|
||||
}
|
||||
|
||||
// fn poll_accept(
|
||||
// self: Pin<&mut Self>,
|
||||
// cx: &mut Context<'_>,
|
||||
// ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
|
||||
// let (stream, _addr) = ready!(self.uds.poll_accept(cx))?;
|
||||
// Poll::Ready(Some(Ok(stream)))
|
||||
// }
|
||||
// }
|
||||
async fn handler(ConnectInfo(info): ConnectInfo<UdsConnectInfo>) -> &'static str {
|
||||
println!("new connection from `{:?}`", info);
|
||||
|
||||
// struct ClientConnection {
|
||||
// stream: UnixStream,
|
||||
// }
|
||||
"Hello, World!"
|
||||
}
|
||||
|
||||
// impl AsyncWrite for ClientConnection {
|
||||
// fn poll_write(
|
||||
// mut self: Pin<&mut Self>,
|
||||
// cx: &mut Context<'_>,
|
||||
// buf: &[u8],
|
||||
// ) -> Poll<Result<usize, io::Error>> {
|
||||
// Pin::new(&mut self.stream).poll_write(cx, buf)
|
||||
// }
|
||||
#[derive(Clone, Debug)]
|
||||
#[allow(dead_code)]
|
||||
struct UdsConnectInfo {
|
||||
peer_addr: Arc<tokio::net::unix::SocketAddr>,
|
||||
peer_cred: UCred,
|
||||
}
|
||||
|
||||
// fn poll_flush(
|
||||
// mut self: Pin<&mut Self>,
|
||||
// cx: &mut Context<'_>,
|
||||
// ) -> Poll<Result<(), io::Error>> {
|
||||
// Pin::new(&mut self.stream).poll_flush(cx)
|
||||
// }
|
||||
impl connect_info::Connected<&UnixStream> for UdsConnectInfo {
|
||||
fn connect_info(target: &UnixStream) -> Self {
|
||||
let peer_addr = target.peer_addr().unwrap();
|
||||
let peer_cred = target.peer_cred().unwrap();
|
||||
|
||||
// fn poll_shutdown(
|
||||
// mut self: Pin<&mut Self>,
|
||||
// cx: &mut Context<'_>,
|
||||
// ) -> Poll<Result<(), io::Error>> {
|
||||
// Pin::new(&mut self.stream).poll_shutdown(cx)
|
||||
// }
|
||||
// }
|
||||
Self {
|
||||
peer_addr: Arc::new(peer_addr),
|
||||
peer_cred,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// impl AsyncRead for ClientConnection {
|
||||
// fn poll_read(
|
||||
// mut self: Pin<&mut Self>,
|
||||
// cx: &mut Context<'_>,
|
||||
// buf: &mut tokio::io::ReadBuf<'_>,
|
||||
// ) -> Poll<io::Result<()>> {
|
||||
// Pin::new(&mut self.stream).poll_read(cx, buf)
|
||||
// }
|
||||
// }
|
||||
|
||||
// impl Connection for ClientConnection {
|
||||
// fn connected(&self) -> Connected {
|
||||
// Connected::new()
|
||||
// }
|
||||
// }
|
||||
|
||||
// #[derive(Clone, Debug)]
|
||||
// #[allow(dead_code)]
|
||||
// struct UdsConnectInfo {
|
||||
// peer_addr: Arc<tokio::net::unix::SocketAddr>,
|
||||
// peer_cred: UCred,
|
||||
// }
|
||||
|
||||
// impl connect_info::Connected<&UnixStream> for UdsConnectInfo {
|
||||
// fn connect_info(target: &UnixStream) -> Self {
|
||||
// let peer_addr = target.peer_addr().unwrap();
|
||||
// let peer_cred = target.peer_cred().unwrap();
|
||||
|
||||
// Self {
|
||||
// peer_addr: Arc::new(peer_addr),
|
||||
// peer_cred,
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
fn unwrap_infallible<T>(result: Result<T, Infallible>) -> T {
|
||||
match result {
|
||||
Ok(value) => value,
|
||||
Err(err) => match err {},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue