mirror of
https://github.com/tokio-rs/axum.git
synced 2025-03-13 19:27:53 +01:00
add more comments to chat example (#1665)
This commit is contained in:
parent
6638b274db
commit
7192c590c9
1 changed files with 19 additions and 9 deletions
|
@ -26,7 +26,9 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
|||
|
||||
// Our shared state
|
||||
struct AppState {
|
||||
// We require unique usernames. This tracks which usernames have been taken.
|
||||
user_set: Mutex<HashSet<String>>,
|
||||
// Channel used to send messages to all connected clients.
|
||||
tx: broadcast::Sender<String>,
|
||||
}
|
||||
|
||||
|
@ -40,6 +42,7 @@ async fn main() {
|
|||
.with(tracing_subscriber::fmt::layer())
|
||||
.init();
|
||||
|
||||
// Set up application state for use with with_state().
|
||||
let user_set = Mutex::new(HashSet::new());
|
||||
let (tx, _rx) = broadcast::channel(100);
|
||||
|
||||
|
@ -65,8 +68,11 @@ async fn websocket_handler(
|
|||
ws.on_upgrade(|socket| websocket(socket, state))
|
||||
}
|
||||
|
||||
// This function deals with a single websocket connection, i.e., a single
|
||||
// connected client / user, for which we will spawn two independent tasks (for
|
||||
// receiving / sending chat messages).
|
||||
async fn websocket(stream: WebSocket, state: Arc<AppState>) {
|
||||
// By splitting we can send and receive at the same time.
|
||||
// By splitting, we can send and receive at the same time.
|
||||
let (mut sender, mut receiver) = stream.split();
|
||||
|
||||
// Username gets set in the receive loop, if it's valid.
|
||||
|
@ -91,15 +97,17 @@ async fn websocket(stream: WebSocket, state: Arc<AppState>) {
|
|||
}
|
||||
}
|
||||
|
||||
// Subscribe before sending joined message.
|
||||
// We subscribe *before* sending the "joined" message, so that we will also
|
||||
// display it to our client.
|
||||
let mut rx = state.tx.subscribe();
|
||||
|
||||
// Send joined message to all subscribers.
|
||||
// Now send the "joined" message to all subscribers.
|
||||
let msg = format!("{} joined.", username);
|
||||
tracing::debug!("{}", msg);
|
||||
let _ = state.tx.send(msg);
|
||||
|
||||
// This task will receive broadcast messages and send text message to our client.
|
||||
// Spawn the first task that will receive broadcast messages and send text
|
||||
// messages over the websocket to our client.
|
||||
let mut send_task = tokio::spawn(async move {
|
||||
while let Ok(msg) = rx.recv().await {
|
||||
// In any websocket error, break loop.
|
||||
|
@ -109,11 +117,12 @@ async fn websocket(stream: WebSocket, state: Arc<AppState>) {
|
|||
}
|
||||
});
|
||||
|
||||
// Clone things we want to pass to the receiving task.
|
||||
// Clone things we want to pass (move) to the receiving task.
|
||||
let tx = state.tx.clone();
|
||||
let name = username.clone();
|
||||
|
||||
// This task will receive messages from client and send them to broadcast subscribers.
|
||||
// Spawn a task that takes messages from the websocket, prepends the user
|
||||
// name, and sends them to all broadcast subscribers.
|
||||
let mut recv_task = tokio::spawn(async move {
|
||||
while let Some(Ok(Message::Text(text))) = receiver.next().await {
|
||||
// Add username before message.
|
||||
|
@ -121,17 +130,18 @@ async fn websocket(stream: WebSocket, state: Arc<AppState>) {
|
|||
}
|
||||
});
|
||||
|
||||
// If any one of the tasks exit, abort the other.
|
||||
// If any one of the tasks run to completion, we abort the other.
|
||||
tokio::select! {
|
||||
_ = (&mut send_task) => recv_task.abort(),
|
||||
_ = (&mut recv_task) => send_task.abort(),
|
||||
};
|
||||
|
||||
// Send user left message.
|
||||
// Send "user left" message (similar to "joined" above).
|
||||
let msg = format!("{} left.", username);
|
||||
tracing::debug!("{}", msg);
|
||||
let _ = state.tx.send(msg);
|
||||
// Remove username from map so new clients can take it.
|
||||
|
||||
// Remove username from map so new clients can take it again.
|
||||
state.user_set.lock().unwrap().remove(&username);
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue