Add an example integration test for SSE (#2465)

Signed-off-by: Hung-Han (Henry) Chen <chenhungh@gmail.com>
Co-authored-by: David Pedersen <david.pdrsn@gmail.com>
This commit is contained in:
Henry Chen 2023-12-30 18:04:23 +01:00 committed by GitHub
parent 046e7299e1
commit c34041510c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 76 additions and 9 deletions

View file

@ -14,3 +14,8 @@ tokio-stream = "0.1"
tower-http = { version = "0.5.0", features = ["fs", "trace"] } tower-http = { version = "0.5.0", features = ["fs", "trace"] }
tracing = "0.1" tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
[dev-dependencies]
eventsource-stream = "0.2"
reqwest = { version = "0.11", features = ["stream"] }
reqwest-eventsource = "0.5"

View file

@ -3,6 +3,10 @@
//! ```not_rust //! ```not_rust
//! cargo run -p example-sse //! cargo run -p example-sse
//! ``` //! ```
//! Test with
//! ```not_rust
//! cargo test -p example-sse
//! ```
use axum::{ use axum::{
response::sse::{Event, Sse}, response::sse::{Event, Sse},
@ -26,15 +30,8 @@ async fn main() {
.with(tracing_subscriber::fmt::layer()) .with(tracing_subscriber::fmt::layer())
.init(); .init();
let assets_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("assets"); // build our application
let app = app();
let static_files_service = ServeDir::new(assets_dir).append_index_html_on_directories(true);
// build our application with a route
let app = Router::new()
.fallback_service(static_files_service)
.route("/sse", get(sse_handler))
.layer(TraceLayer::new_for_http());
// run it // run it
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
@ -44,6 +41,16 @@ async fn main() {
axum::serve(listener, app).await.unwrap(); axum::serve(listener, app).await.unwrap();
} }
fn app() -> Router {
let assets_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("assets");
let static_files_service = ServeDir::new(assets_dir).append_index_html_on_directories(true);
// build our application with a route
Router::new()
.fallback_service(static_files_service)
.route("/sse", get(sse_handler))
.layer(TraceLayer::new_for_http())
}
async fn sse_handler( async fn sse_handler(
TypedHeader(user_agent): TypedHeader<headers::UserAgent>, TypedHeader(user_agent): TypedHeader<headers::UserAgent>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> { ) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
@ -63,3 +70,58 @@ async fn sse_handler(
.text("keep-alive-text"), .text("keep-alive-text"),
) )
} }
#[cfg(test)]
mod tests {
use eventsource_stream::Eventsource;
use tokio::net::TcpListener;
use super::*;
#[tokio::test]
async fn integration_test() {
// A helper function that spawns our application in the background
async fn spawn_app(host: impl Into<String>) -> String {
let host = host.into();
// Bind to localhost at the port 0, which will let the OS assign an available port to us
let listener = TcpListener::bind(format!("{}:0", host)).await.unwrap();
// Retrieve the port assigned to us by the OS
let port = listener.local_addr().unwrap().port();
tokio::spawn(async {
axum::serve(listener, app()).await.unwrap();
});
// Returns address (e.g. http://127.0.0.1{random_port})
format!("http://{}:{}", host, port)
}
let listening_url = spawn_app("127.0.0.1").await;
let mut event_stream = reqwest::Client::new()
.get(&format!("{}/sse", listening_url))
.header("User-Agent", "integration_test")
.send()
.await
.unwrap()
.bytes_stream()
.eventsource()
.take(1);
let mut event_data: Vec<String> = vec![];
while let Some(event) = event_stream.next().await {
match event {
Ok(event) => {
// break the loop at the end of SSE stream
if event.data == "[DONE]" {
break;
}
event_data.push(event.data);
}
Err(_) => {
panic!("Error in event stream");
}
}
}
assert!(event_data[0] == "hi!");
}
}