diff --git a/src/bot/limits.rs b/src/bot/limits.rs index 539c6dee..be8e3a12 100644 --- a/src/bot/limits.rs +++ b/src/bot/limits.rs @@ -59,7 +59,7 @@ async fn worker( // FIXME: remove unnecessary ChatId clones // FIXME: struct with fast random remove and append-to-the-end - let mut queue: VecDeque<(ChatId, Sender<()>)> = VecDeque::new(); // FIXME: with_cap + let mut queue: Vec<Option<(ChatId, Sender<()>)>> = Vec::new(); // FIXME: with_cap let mut history: VecDeque<(ChatId, Instant)> = VecDeque::new(); // hchats[chat] = history.iter().filter(|(c, _)| c == chat).count() @@ -68,13 +68,13 @@ async fn worker( loop { // If there are no pending requests we are just waiting if queue.is_empty() { - queue.push_back(queue_rx.recv().await.unwrap()); + queue.push(Some(queue_rx.recv().await.unwrap())); } // update local queue with latest requests while let Ok(e) = queue_rx.try_recv() { // FIXME: properly check for errors (stop when the bot's sender is dropped?) - queue.push_back(e) + queue.push(Some(e)) } let now = Instant::now(); @@ -90,13 +90,13 @@ async fn worker( if let Entry::Occupied(entry) = hchats .entry(chat) .and_modify(|count| { *count -= 1; }) { - if entry.get() == 0 { entry.remove_entry(); } + if *entry.get() == 0 { entry.remove_entry(); } } } } // as truncates which is ok since in case of truncation it would always be >= limits.overall_s - let mut allowed = limits.overall_s.saturating_sub(dbg!(&history).iter().take_while(|(_, time)| time > &sec_back).count() as u32); + let mut allowed = limits.overall_s.saturating_sub(history.iter().take_while(|(_, time)| time > &sec_back).count() as u32); if allowed == 0 { delay_for(DELAY).await; @@ -110,30 +110,45 @@ async fn worker( .or_insert(0) += 1; } - let mut i = 0; - while allowed > 0 && i < queue.len() { - let chat = &queue[i].0; - - if hchats_s - .get(chat) - .copied() - .unwrap_or(0) < limits.chat_s && - hchats + let mut empty = 0; + for i in 0..queue.len() { + let chat = &queue[i].as_ref().unwrap().0; + let cond = { + hchats_s .get(chat) .copied() - .unwrap_or(0) < limits.chat_m - { - let chat = chat.clone(); - *hchats_s.entry(chat.clone()).or_insert(0) += 1; - *hchats.entry(chat.clone()).or_insert(0) += 1; + .unwrap_or(0) < limits.chat_s && + hchats + .get(chat) + .copied() + .unwrap_or(0) < limits.chat_m + }; + + if cond { + { + *hchats_s.entry(chat.clone()).or_insert(0) += 1; + *hchats.entry(chat.clone()).or_insert(0) += 1; + history.push_back((chat.clone(), Instant::now())); + } + queue[i].take().unwrap().1.send(()); - history.push_back((chat, Instant::now())); - queue.remove(i).unwrap().1.send(()); allowed -= 1; + if allowed == 0 { + if empty != i { + // FIXME: this could be more optimal + for j in i..queue.len() { + queue.swap(j, empty); + empty += 1; + } + } + break; + } } else { - i += 1; + queue.swap(i, empty); + empty += 1; } } + queue.truncate(empty); delay_for(DELAY).await; } @@ -283,6 +298,7 @@ impl<R: Request> Future for LimitedSend<R> { use chan_send::{ChanSend, SendTy as _}; use crate::bot::limits::chan_send::SendTy; use std::collections::hash_map::Entry; +use core::mem; mod chan_send { use tokio::sync::mpsc;