[throttle] queue optimizations#0

This commit is contained in:
Waffle 2020-09-30 19:21:35 +03:00
parent f270613e7e
commit 3f9db1f31b

View file

@ -59,7 +59,7 @@ async fn worker(
// FIXME: remove unnecessary ChatId clones
// FIXME: struct with fast random remove and append-to-the-end
let mut queue: VecDeque<(ChatId, Sender<()>)> = VecDeque::new(); // FIXME: with_cap
let mut queue: Vec<Option<(ChatId, Sender<()>)>> = Vec::new(); // FIXME: with_cap
let mut history: VecDeque<(ChatId, Instant)> = VecDeque::new();
// hchats[chat] = history.iter().filter(|(c, _)| c == chat).count()
@ -68,13 +68,13 @@ async fn worker(
loop {
// If there are no pending requests we are just waiting
if queue.is_empty() {
queue.push_back(queue_rx.recv().await.unwrap());
queue.push(Some(queue_rx.recv().await.unwrap()));
}
// update local queue with latest requests
while let Ok(e) = queue_rx.try_recv() {
// FIXME: properly check for errors (stop when the bot's sender is dropped?)
queue.push_back(e)
queue.push(Some(e))
}
let now = Instant::now();
@ -90,13 +90,13 @@ async fn worker(
if let Entry::Occupied(entry) = hchats
.entry(chat)
.and_modify(|count| { *count -= 1; }) {
if entry.get() == 0 { entry.remove_entry(); }
if *entry.get() == 0 { entry.remove_entry(); }
}
}
}
// as truncates which is ok since in case of truncation it would always be >= limits.overall_s
let mut allowed = limits.overall_s.saturating_sub(dbg!(&history).iter().take_while(|(_, time)| time > &sec_back).count() as u32);
let mut allowed = limits.overall_s.saturating_sub(history.iter().take_while(|(_, time)| time > &sec_back).count() as u32);
if allowed == 0 {
delay_for(DELAY).await;
@ -110,30 +110,45 @@ async fn worker(
.or_insert(0) += 1;
}
let mut i = 0;
while allowed > 0 && i < queue.len() {
let chat = &queue[i].0;
if hchats_s
.get(chat)
.copied()
.unwrap_or(0) < limits.chat_s &&
hchats
let mut empty = 0;
for i in 0..queue.len() {
let chat = &queue[i].as_ref().unwrap().0;
let cond = {
hchats_s
.get(chat)
.copied()
.unwrap_or(0) < limits.chat_m
{
let chat = chat.clone();
*hchats_s.entry(chat.clone()).or_insert(0) += 1;
*hchats.entry(chat.clone()).or_insert(0) += 1;
.unwrap_or(0) < limits.chat_s &&
hchats
.get(chat)
.copied()
.unwrap_or(0) < limits.chat_m
};
if cond {
{
*hchats_s.entry(chat.clone()).or_insert(0) += 1;
*hchats.entry(chat.clone()).or_insert(0) += 1;
history.push_back((chat.clone(), Instant::now()));
}
queue[i].take().unwrap().1.send(());
history.push_back((chat, Instant::now()));
queue.remove(i).unwrap().1.send(());
allowed -= 1;
if allowed == 0 {
if empty != i {
// FIXME: this could be more optimal
for j in i..queue.len() {
queue.swap(j, empty);
empty += 1;
}
}
break;
}
} else {
i += 1;
queue.swap(i, empty);
empty += 1;
}
}
queue.truncate(empty);
delay_for(DELAY).await;
}
@ -283,6 +298,7 @@ impl<R: Request> Future for LimitedSend<R> {
use chan_send::{ChanSend, SendTy as _};
use crate::bot::limits::chan_send::SendTy;
use std::collections::hash_map::Entry;
use core::mem;
mod chan_send {
use tokio::sync::mpsc;