Fix driving readiness for example-rest-grpc-multiplex (#988)

This commit is contained in:
David Pedersen 2022-05-03 11:21:49 +02:00 committed by GitHub
parent d2fab9245b
commit d1043db254
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1,4 +1,4 @@
use axum::{body::BoxBody, response::IntoResponse}; use axum::{body::BoxBody, http::header::CONTENT_TYPE, response::IntoResponse};
use futures::{future::BoxFuture, ready}; use futures::{future::BoxFuture, ready};
use hyper::{Body, Request, Response}; use hyper::{Body, Request, Response};
use std::{ use std::{
@ -7,7 +7,6 @@ use std::{
}; };
use tower::Service; use tower::Service;
#[derive(Clone)]
pub struct MultiplexService<A, B> { pub struct MultiplexService<A, B> {
rest: A, rest: A,
rest_ready: bool, rest_ready: bool,
@ -26,6 +25,22 @@ impl<A, B> MultiplexService<A, B> {
} }
} }
impl<A, B> Clone for MultiplexService<A, B>
where
A: Clone,
B: Clone,
{
fn clone(&self) -> Self {
Self {
rest: self.rest.clone(),
grpc: self.grpc.clone(),
// the cloned services probably wont be ready
rest_ready: false,
grpc_ready: false,
}
}
}
impl<A, B> Service<Request<Body>> for MultiplexService<A, B> impl<A, B> Service<Request<Body>> for MultiplexService<A, B>
where where
A: Service<Request<Body>, Error = Infallible>, A: Service<Request<Body>, Error = Infallible>,
@ -40,6 +55,7 @@ where
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// drive readiness for each inner service and record which is ready
loop { loop {
match (self.rest_ready, self.grpc_ready) { match (self.rest_ready, self.grpc_ready) {
(true, true) => { (true, true) => {
@ -47,7 +63,7 @@ where
} }
(false, _) => { (false, _) => {
ready!(self.rest.poll_ready(cx))?; ready!(self.rest.poll_ready(cx))?;
self.rest_ready = false; self.rest_ready = true;
} }
(_, false) => { (_, false) => {
ready!(self.grpc.poll_ready(cx))?; ready!(self.grpc.poll_ready(cx))?;
@ -58,6 +74,19 @@ where
} }
fn call(&mut self, req: Request<Body>) -> Self::Future { fn call(&mut self, req: Request<Body>) -> Self::Future {
// require users to call `poll_ready` first, if they don't we're allowed to panic
// as per the `tower::Service` contract
assert!(
self.grpc_ready,
"grpc service not ready. Did you forget to call `poll_ready`?"
);
assert!(
self.rest_ready,
"rest service not ready. Did you forget to call `poll_ready`?"
);
// if we get a grpc request call the grpc service, otherwise call the rest service
// when calling a service it becomes not-ready so we have drive readiness again
if is_grpc_request(&req) { if is_grpc_request(&req) {
self.grpc_ready = false; self.grpc_ready = false;
let future = self.grpc.call(req); let future = self.grpc.call(req);
@ -78,7 +107,7 @@ where
fn is_grpc_request<B>(req: &Request<B>) -> bool { fn is_grpc_request<B>(req: &Request<B>) -> bool {
req.headers() req.headers()
.get("content-type") .get(CONTENT_TYPE)
.map(|content_type| content_type.as_bytes()) .map(|content_type| content_type.as_bytes())
.filter(|content_type| content_type.starts_with(b"application/grpc")) .filter(|content_type| content_type.starts_with(b"application/grpc"))
.is_some() .is_some()