Use futures::future::select instead of tokio::select!

Pros: less macros, fixes the missing feature issue.
Cons: a bit more code because `fututres` is an annoying crate
      which does not use `either::Either` and provides its
      own `Either` which does not have `map_either`, or
      basically anything for that matter.
This commit is contained in:
Maybe Waffle 2023-10-04 19:37:18 +04:00
parent d21ca11a54
commit cec2a018ff
4 changed files with 52 additions and 20 deletions

View file

@ -31,7 +31,7 @@ native-tls = ["reqwest/native-tls"]
nightly = [] nightly = []
# Throttling bot adaptor # Throttling bot adaptor
throttle = ["vecrem", "tokio/macros"] throttle = ["vecrem"]
# Trace bot adaptor # Trace bot adaptor
trace_adaptor = [] trace_adaptor = []

View file

@ -1,8 +1,11 @@
use std::{ use std::{
collections::{hash_map::Entry, HashMap, VecDeque}, collections::{hash_map::Entry, HashMap, VecDeque},
pin::pin,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use either::Either;
use futures::{future, FutureExt as _};
use tokio::sync::{mpsc, mpsc::error::TryRecvError, oneshot::Sender}; use tokio::sync::{mpsc, mpsc::error::TryRecvError, oneshot::Sender};
use vecrem::VecExt; use vecrem::VecExt;
@ -129,17 +132,19 @@ pub(super) async fn worker<B>(
answer_info(&mut info_rx, &mut limits); answer_info(&mut info_rx, &mut limits);
loop { loop {
tokio::select! { let res = future::select(
freeze_until = freeze_rx.recv() => { pin!(freeze_rx.recv()),
freeze( pin!(read_from_rx(&mut rx, &mut queue, &mut rx_is_closed)),
&mut freeze_rx,
slow_mode.as_mut(),
&bot,
freeze_until
) )
.await; .map(either)
}, .await
() = read_from_rx(&mut rx, &mut queue, &mut rx_is_closed) => break, .map_either(|l| l.0, |r| r.0);
match res {
Either::Left(freeze_until) => {
freeze(&mut freeze_rx, slow_mode.as_mut(), &bot, freeze_until).await;
}
Either::Right(()) => break,
} }
} }
//debug_assert_eq!(queue.capacity(), limits.messages_per_sec_overall as usize); //debug_assert_eq!(queue.capacity(), limits.messages_per_sec_overall as usize);
@ -372,6 +377,13 @@ async fn read_from_rx<T>(rx: &mut mpsc::Receiver<T>, queue: &mut Vec<T>, rx_is_c
} }
} }
fn either<L, R>(x: future::Either<L, R>) -> Either<L, R> {
match x {
future::Either::Left(l) => Either::Left(l),
future::Either::Right(r) => Either::Right(r),
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
#[tokio::test] #[tokio::test]

View file

@ -94,6 +94,7 @@ futures = "0.3.15"
pin-project = "1.0" pin-project = "1.0"
serde_with_macros = "1.4" serde_with_macros = "1.4"
aquamarine = "0.1.11" aquamarine = "0.1.11"
either = "1.9.0"
sqlx = { version = "0.6", optional = true, default-features = false, features = [ sqlx = { version = "0.6", optional = true, default-features = false, features = [
"macros", "macros",

View file

@ -10,7 +10,12 @@ use crate::{
}; };
use dptree::di::{DependencyMap, DependencySupplier}; use dptree::di::{DependencyMap, DependencySupplier};
use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use either::Either;
use futures::{
future::{self, BoxFuture},
stream::FuturesUnordered,
FutureExt as _, StreamExt as _,
};
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use std::{ use std::{
@ -19,6 +24,7 @@ use std::{
future::Future, future::Future,
hash::Hash, hash::Hash,
ops::{ControlFlow, Deref}, ops::{ControlFlow, Deref},
pin::pin,
sync::{ sync::{
atomic::{AtomicBool, AtomicU32, Ordering}, atomic::{AtomicBool, AtomicU32, Ordering},
Arc, Arc,
@ -321,12 +327,18 @@ where
loop { loop {
self.remove_inactive_workers_if_needed().await; self.remove_inactive_workers_if_needed().await;
tokio::select! { let res = future::select(stream.next(), pin!(self.state.wait_for_changes()))
upd = stream.next() => match upd { .map(either)
None => break, .await
.map_either(|l| l.0, |r| r.0);
match res {
Either::Left(upd) => match upd {
Some(upd) => self.process_update(upd, &update_listener_error_handler).await, Some(upd) => self.process_update(upd, &update_listener_error_handler).await,
None => break,
}, },
() = self.state.wait_for_changes() => if self.state.is_shutting_down() { Either::Right(()) => {
if self.state.is_shutting_down() {
if let Some(token) = stop_token.take() { if let Some(token) = stop_token.take() {
log::debug!("Start shutting down dispatching..."); log::debug!("Start shutting down dispatching...");
token.stop(); token.stop();
@ -335,6 +347,7 @@ where
} }
} }
} }
}
self.workers self.workers
.drain() .drain()
@ -578,6 +591,12 @@ async fn handle_update<Err>(
} }
} }
fn either<L, R>(x: future::Either<L, R>) -> Either<L, R> {
match x {
future::Either::Left(l) => Either::Left(l),
future::Either::Right(r) => Either::Right(r),
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::convert::Infallible; use std::convert::Infallible;