axum/examples/key_value_store.rs
David Pedersen 04d62798b6
Reduce body boxing (#9)
Previously, when routing between one or two requests the two body types
would be merged by boxing them. This isn't ideal since it introduces a
layer indirection for each route.

We can't require the services to be routed between as not all services
use the same body type.

This changes that so it instead uses an `Either` enum that implements
`http_body::Body` if each variant does. Will reduce the overall
allocations and hopefully the compiler can optimize things if both
variants are the same.
2021-06-12 23:59:18 +02:00

136 lines
3.6 KiB
Rust

//! Simple in-memory key/value store showing features of tower-web.
//!
//! Run with:
//!
//! ```not_rust
//! RUST_LOG=tower_http=debug,key_value_store=trace cargo run --example key_value_store
//! ```
use bytes::Bytes;
use http::StatusCode;
use std::{
borrow::Cow,
collections::HashMap,
net::SocketAddr,
sync::{Arc, RwLock},
time::Duration,
};
use tower::{BoxError, ServiceBuilder};
use tower_http::{
add_extension::AddExtensionLayer, auth::RequireAuthorizationLayer,
compression::CompressionLayer, trace::TraceLayer,
};
use tower_web::{
extract::{ContentLengthLimit, Extension, UrlParams},
prelude::*,
response::IntoResponse,
routing::BoxRoute,
};
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
// Build our application by composing routes
let app = route(
"/:key",
// Add compression to `kv_get`
get(kv_get.layer(CompressionLayer::new()))
// But don't compress `kv_set`
.post(kv_set),
)
.route("/keys", get(list_keys))
// Nest our admin routes under `/admin`
.nest("/admin", admin_routes())
// Add middleware to all routes
.layer(
ServiceBuilder::new()
.load_shed()
.concurrency_limit(1024)
.timeout(Duration::from_secs(10))
.layer(TraceLayer::new_for_http())
.layer(AddExtensionLayer::new(SharedState::default()))
.into_inner(),
)
// Handle errors from middleware
.handle_error(handle_error);
// Run our app with hyper
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
tracing::debug!("listening on {}", addr);
app.serve(&addr).await.unwrap();
}
type SharedState = Arc<RwLock<State>>;
#[derive(Default)]
struct State {
db: HashMap<String, Bytes>,
}
async fn kv_get(
UrlParams((key,)): UrlParams<(String,)>,
Extension(state): Extension<SharedState>,
) -> Result<Bytes, StatusCode> {
let db = &state.read().unwrap().db;
if let Some(value) = db.get(&key) {
Ok(value.clone())
} else {
Err(StatusCode::NOT_FOUND)
}
}
async fn kv_set(
UrlParams((key,)): UrlParams<(String,)>,
ContentLengthLimit(bytes): ContentLengthLimit<Bytes, { 1024 * 5_000 }>, // ~5mb
Extension(state): Extension<SharedState>,
) {
state.write().unwrap().db.insert(key, bytes);
}
async fn list_keys(Extension(state): Extension<SharedState>) -> String {
let db = &state.read().unwrap().db;
db.keys()
.map(|key| key.to_string())
.collect::<Vec<String>>()
.join("\n")
}
fn admin_routes() -> BoxRoute {
async fn delete_all_keys(Extension(state): Extension<SharedState>) {
state.write().unwrap().db.clear();
}
async fn remove_key(
UrlParams((key,)): UrlParams<(String,)>,
Extension(state): Extension<SharedState>,
) {
state.write().unwrap().db.remove(&key);
}
route("/keys", delete(delete_all_keys))
.route("/key/:key", delete(remove_key))
// Require beare auth for all admin routes
.layer(RequireAuthorizationLayer::bearer("secret-token"))
.boxed()
}
fn handle_error(error: BoxError) -> impl IntoResponse {
if error.is::<tower::timeout::error::Elapsed>() {
return (StatusCode::REQUEST_TIMEOUT, Cow::from("request timed out"));
}
if error.is::<tower::load_shed::error::Overloaded>() {
return (
StatusCode::SERVICE_UNAVAILABLE,
Cow::from("service is overloaded, try again later"),
);
}
(
StatusCode::INTERNAL_SERVER_ERROR,
Cow::from(format!("Unhandled internal error: {}", error)),
)
}