refactor(throttle): small renamings

This commit is contained in:
Temirkhan Myrzamadi 2021-02-06 19:07:10 +06:00
parent 70563ee4a9
commit d8c16d420d

View file

@ -86,12 +86,14 @@ const DELAY: Duration = Duration::from_millis(250);
/// [@BotSupport]: https://t.me/botsupport
#[derive(Copy, Clone, Eq, PartialEq, Hash)]
pub struct Limits {
/// Allowed messages in one chat per second
pub chat_s: u32,
/// Allowed messages per second
pub overall_s: u32,
/// Allowed messages in one chat per minute
pub chat_m: u32,
/// Allowed messages in one chat per second.
pub messages_per_sec_chat: u32,
/// Allowed messages in one chat per minute.
pub messages_per_min_chat: u32,
/// Allowed messages per second.
pub messages_per_sec_overall: u32,
}
/// Defaults are taken from [telegram documentation][tgdoc].
@ -100,9 +102,9 @@ pub struct Limits {
impl Default for Limits {
fn default() -> Self {
Self {
chat_s: 1,
overall_s: 30,
chat_m: 20,
messages_per_sec_chat: 1,
messages_per_sec_overall: 30,
messages_per_min_chat: 20,
}
}
}
@ -152,29 +154,28 @@ pub struct Throttle<B> {
queue: mpsc::Sender<(Id, Sender<Never>)>,
}
async fn worker(limits: Limits, mut queue_rx: mpsc::Receiver<(Id, Sender<Never>)>) {
async fn worker(limits: Limits, mut rx: mpsc::Receiver<(Id, Sender<Never>)>) {
// +- Same idea as in `Throttle::new`
let cap = limits.overall_s + (limits.overall_s / 4);
let capacity = limits.messages_per_sec_overall + (limits.messages_per_sec_overall / 4);
// FIXME(waffle): Make an research about data structures for this queue.
// Currently this is O(n) removing (n = number of elements
// stayed), amortized O(1) push (vec+vecrem).
let mut queue: Vec<(Id, Sender<Never>)> = Vec::with_capacity(cap as usize);
let mut queue: Vec<(Id, Sender<Never>)> = Vec::with_capacity(capacity as usize);
// I wish there was special data structure for history which removed the
// need in 2 hashmaps
// (waffle)
let mut history: VecDeque<(Id, Instant)> = VecDeque::new();
// hchats[chat] = history.iter().filter(|(c, _)| c == chat).count()
let mut hchats: HashMap<Id, u32> = HashMap::new();
let mut hchats_s = HashMap::new();
// set to true when `queue_rx` is closed
// set to true when `rx` is closed
let mut close = false;
while !close || !queue.is_empty() {
// If there are no pending requests we are just waiting
if queue.is_empty() {
match queue_rx.recv().await {
match rx.recv().await {
Some(req) => queue.push(req),
None => close = true,
}
@ -183,7 +184,7 @@ async fn worker(limits: Limits, mut queue_rx: mpsc::Receiver<(Id, Sender<Never>)
// update local queue with latest requests
loop {
// FIXME(waffle): https://github.com/tokio-rs/tokio/issues/3350
match queue_rx.recv().now_or_never() {
match rx.recv().now_or_never() {
Some(Some(req)) => queue.push(req),
// There are no items in queue
None => break,
@ -255,7 +256,7 @@ async fn worker(limits: Limits, mut queue_rx: mpsc::Receiver<(Id, Sender<Never>)
.iter()
.take_while(|(_, time)| time > &sec_back)
.count() as u32;
let mut allowed = limits.overall_s.saturating_sub(used);
let mut allowed = limits.messages_per_sec_overall.saturating_sub(used);
if allowed == 0 {
hchats_s.clear();
@ -272,8 +273,8 @@ async fn worker(limits: Limits, mut queue_rx: mpsc::Receiver<(Id, Sender<Never>)
while let Some(entry) = queue_rem.next() {
let chat = &entry.value().0;
let cond = {
hchats_s.get(chat).copied().unwrap_or(0) < limits.chat_s
&& hchats.get(chat).copied().unwrap_or(0) < limits.chat_m
hchats_s.get(chat).copied().unwrap_or(0) < limits.messages_per_sec_chat
&& hchats.get(chat).copied().unwrap_or(0) < limits.messages_per_min_chat
};
if cond {
@ -312,14 +313,11 @@ impl<B> Throttle<B> {
// so we won't lose performance when hitting limits.
//
// (I hope this makes sense) (waffle)
let buffer = limits.overall_s + (limits.overall_s / 8);
let (queue_tx, queue_rx) = mpsc::channel(buffer as usize);
let buffer_size = limits.messages_per_sec_overall + (limits.messages_per_sec_overall / 8);
let (tx, rx) = mpsc::channel(buffer_size as usize);
let worker = worker(limits, queue_rx);
let this = Self {
bot,
queue: queue_tx,
};
let worker = worker(limits, rx);
let this = Self { bot, queue: tx };
(this, worker)
}
@ -491,7 +489,7 @@ download_forward! {
#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)]
enum Id {
Id(i64),
Ch(u64),
ChannelUsernameHash(u64),
}
impl From<&ChatId> for Id {
@ -502,7 +500,7 @@ impl From<&ChatId> for Id {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
username.hash(&mut hasher);
let hash = hasher.finish();
Id::Ch(hash)
Id::ChannelUsernameHash(hash)
}
}
}