Add rest and grpc example (#967)

* Add rest and grpc example

* remove needless dependencies

* Apply suggestions from code review

Co-authored-by: David Pedersen <david.pdrsn@gmail.com>

* Update examples/rest-grpc-multiplex/src/main.rs

Co-authored-by: David Pedersen <david.pdrsn@gmail.com>

* Update examples/rest-grpc-multiplex/src/main.rs

Co-authored-by: David Pedersen <david.pdrsn@gmail.com>

* Update examples/rest-grpc-multiplex/src/multiplex_service.rs

Co-authored-by: David Pedersen <david.pdrsn@gmail.com>

* Update examples/rest-grpc-multiplex/Cargo.toml

Co-authored-by: David Pedersen <david.pdrsn@gmail.com>

* Update examples/rest-grpc-multiplex/src/multiplex_service.rs

Co-authored-by: David Pedersen <david.pdrsn@gmail.com>

* clean noisy code

* fix nitpicks

* missing newline

Co-authored-by: David Pedersen <david.pdrsn@gmail.com>
This commit is contained in:
takumi 2022-05-02 23:52:55 +08:00 committed by GitHub
parent a223fd8b85
commit 4ff5e409e3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 216 additions and 0 deletions

View file

@ -0,0 +1,19 @@
[package]
name = "example-rest-grpc-multiplex"
version = "0.1.0"
edition = "2018"
publish = false
[dependencies]
axum = { path = "../../axum" }
futures = "0.3"
hyper = { version = "0.14", features = ["full"] }
prost = "0.10"
tokio = { version = "1", features = ["full"] }
tonic = { version = "0.7" }
tower = { version = "0.4", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
[build-dependencies]
tonic-build = { version = "0.7", features = ["prost"] }

View file

@ -0,0 +1,3 @@
fn main() {
tonic_build::compile_protos("proto/helloworld.proto").unwrap();
}

View file

@ -0,0 +1,37 @@
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}

View file

@ -0,0 +1,72 @@
//! Run with
//!
//! ```not_rust
//! cd examples && cargo run -p example-rest-grpc-multiplex
//! ```
use self::multiplex_service::MultiplexService;
use axum::{routing::get, Router};
use proto::{
greeter_server::{Greeter, GreeterServer},
HelloReply, HelloRequest,
};
use std::net::SocketAddr;
use tonic::{Response as TonicResponse, Status};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
mod multiplex_service;
mod proto {
tonic::include_proto!("helloworld");
}
#[derive(Default)]
struct GrpcServiceImpl {}
#[tonic::async_trait]
impl Greeter for GrpcServiceImpl {
async fn say_hello(
&self,
request: tonic::Request<HelloRequest>,
) -> Result<TonicResponse<HelloReply>, Status> {
tracing::info!("Got a request from {:?}", request.remote_addr());
let reply = HelloReply {
message: format!("Hello {}!", request.into_inner().name),
};
Ok(TonicResponse::new(reply))
}
}
async fn web_root() -> &'static str {
"Hello, World!"
}
#[tokio::main]
async fn main() {
// initialize tracing
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(
std::env::var("RUST_LOG")
.unwrap_or_else(|_| "example_rest_grpc_multiplex=debug".into()),
))
.with(tracing_subscriber::fmt::layer())
.init();
// build the rest service
let rest = Router::new().route("/", get(web_root));
// build the grpc service
let grpc = GreeterServer::new(GrpcServiceImpl::default());
// combine them into one service
let service = MultiplexService::new(rest, grpc);
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
tracing::debug!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(tower::make::Shared::new(service))
.await
.unwrap();
}

View file

@ -0,0 +1,85 @@
use axum::{body::BoxBody, response::IntoResponse};
use futures::{future::BoxFuture, ready};
use hyper::{Body, Request, Response};
use std::{
convert::Infallible,
task::{Context, Poll},
};
use tower::Service;
#[derive(Clone)]
pub struct MultiplexService<A, B> {
rest: A,
rest_ready: bool,
grpc: B,
grpc_ready: bool,
}
impl<A, B> MultiplexService<A, B> {
pub fn new(rest: A, grpc: B) -> Self {
Self {
rest,
rest_ready: false,
grpc,
grpc_ready: false,
}
}
}
impl<A, B> Service<Request<Body>> for MultiplexService<A, B>
where
A: Service<Request<Body>, Error = Infallible>,
A::Response: IntoResponse,
A::Future: Send + 'static,
B: Service<Request<Body>, Error = Infallible>,
B::Response: IntoResponse,
B::Future: Send + 'static,
{
type Response = Response<BoxBody>;
type Error = Infallible;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
match (self.rest_ready, self.grpc_ready) {
(true, true) => {
return Ok(()).into();
}
(false, _) => {
ready!(self.rest.poll_ready(cx))?;
self.rest_ready = false;
}
(_, false) => {
ready!(self.grpc.poll_ready(cx))?;
self.grpc_ready = true;
}
}
}
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
if is_grpc_request(&req) {
self.grpc_ready = false;
let future = self.grpc.call(req);
Box::pin(async move {
let res = future.await?;
Ok(res.into_response())
})
} else {
self.rest_ready = false;
let future = self.rest.call(req);
Box::pin(async move {
let res = future.await?;
Ok(res.into_response())
})
}
}
}
fn is_grpc_request<B>(req: &Request<B>) -> bool {
req.headers()
.get("content-type")
.map(|content_type| content_type.as_bytes())
.filter(|content_type| content_type.starts_with(b"application/grpc"))
.is_some()
}