Remove QueueStatus

This commit is contained in:
Waffle 2021-06-28 09:39:47 +03:00
parent 34e73748c3
commit 58cbf45bac

View file

@ -147,18 +147,6 @@ impl<B> Throttle<B> {
rx.await.ok();
}
/// Returns current queue status.
pub async fn queue_status(&self) -> QueueStatus {
let (tx, rx) = oneshot::channel();
self.info_tx
.send(InfoMessage::QueueStatus { response: tx })
.await
.expect(WORKER_DIED);
rx.await.expect(WORKER_DIED)
}
}
const WORKER_DIED: &str = "worker died before last `Throttle` instance";
@ -196,32 +184,6 @@ impl Default for Limits {
}
}
/// Requests queue status.
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct QueueStatus {
/// Number of requests waiting to be sent.
pub used: usize,
/// Capacity of the queue.
pub capacity: usize,
}
impl QueueStatus {
/// Returns `true` if the queue is full. Note that if the queue is full
/// number of requests may be greater than [`used`].
///
/// [`used`]: QueueStatus::used
pub fn is_full(&self) -> bool {
self.used >= self.capacity
}
/// Returns `true` if the queue is empty.
pub fn is_empty(&self) -> bool {
self.used == 0
}
}
const MINUTE: Duration = Duration::from_secs(60);
const SECOND: Duration = Duration::from_secs(1);
@ -235,7 +197,6 @@ const DELAY: Duration = Duration::from_millis(250);
enum InfoMessage {
GetLimits { response: Sender<Limits> },
SetLimits { new: Limits, response: Sender<()> },
QueueStatus { response: Sender<QueueStatus> },
}
type RequestsSent = u32;
@ -310,18 +271,14 @@ async fn worker(
while !rx_is_closed || !queue.is_empty() {
// FIXME(waffle):
// 1. This call 'blocks' requests executing untill all info messages are
// answered. Crucially, info messages are rare & easy to answer, but this
// still doesn't feel right.
//
// 2. If the `queue` is empty, `read_from_rx` call down below will 'block'
// 1. If the `queue` is empty, `read_from_rx` call down below will 'block'
// execution untill a request is sent. While the execution is 'blocked' no
// `InfoMessage`s could be answered.
//
// 3. If limits are descreased, idially we want to shrink queue.
// 2. If limits are descreased, ideally we want to shrink queue.
//
// *blocked in asyncronous way
answer_info(&mut info_rx, &mut queue, &mut limits).await;
answer_info(&mut info_rx, &mut limits);
read_from_rx(&mut rx, &mut queue, &mut rx_is_closed).await;
//debug_assert_eq!(queue.capacity(), limits.messages_per_sec_overall as usize);
@ -433,11 +390,7 @@ async fn worker(
}
}
async fn answer_info(
rx: &mut mpsc::Receiver<InfoMessage>,
queue: &mut Vec<(ChatIdHash, RequestLock)>,
limits: &mut Limits,
) {
fn answer_info(rx: &mut mpsc::Receiver<InfoMessage>, limits: &mut Limits) {
// FIXME(waffle): https://github.com/tokio-rs/tokio/issues/3350
while let Some(Some(req)) = rx.recv().now_or_never() {
// Errors are ignored with .ok(). Error means that the response channel
@ -448,12 +401,6 @@ async fn answer_info(
*limits = new;
response.send(()).ok()
}
InfoMessage::QueueStatus { response } => response
.send(QueueStatus {
used: queue.len(),
capacity: queue.capacity(),
})
.ok(),
};
}
}