mirror of
https://github.com/teloxide/teloxide.git
synced 2025-03-20 13:59:00 +01:00
Limit queue growth in throttle
This commit is contained in:
parent
669d6c95be
commit
72306c39d9
1 changed files with 3 additions and 1 deletions
|
@ -178,6 +178,7 @@ async fn worker(limits: Limits, mut rx: mpsc::Receiver<(ChatIdHash, RequestLock)
|
||||||
|
|
||||||
while !rx_is_closed || !queue.is_empty() {
|
while !rx_is_closed || !queue.is_empty() {
|
||||||
read_from_rx(&mut rx, &mut queue, &mut rx_is_closed).await;
|
read_from_rx(&mut rx, &mut queue, &mut rx_is_closed).await;
|
||||||
|
debug_assert_eq!(queue.capacity(), limits.messages_per_sec_overall as usize);
|
||||||
|
|
||||||
// _Maybe_ we need to use `spawn_blocking` here, because there is
|
// _Maybe_ we need to use `spawn_blocking` here, because there is
|
||||||
// decent amount of blocking work. However _for now_ I've decided not
|
// decent amount of blocking work. However _for now_ I've decided not
|
||||||
|
@ -294,7 +295,8 @@ async fn read_from_rx<T>(rx: &mut mpsc::Receiver<T>, queue: &mut Vec<T>, rx_is_c
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
// Don't grow queue bigger than the capacity to limit DOS posibility
|
||||||
|
while queue.len() < queue.capacity() {
|
||||||
// FIXME(waffle): https://github.com/tokio-rs/tokio/issues/3350
|
// FIXME(waffle): https://github.com/tokio-rs/tokio/issues/3350
|
||||||
match rx.recv().now_or_never() {
|
match rx.recv().now_or_never() {
|
||||||
Some(Some(req)) => queue.push(req),
|
Some(Some(req)) => queue.push(req),
|
||||||
|
|
Loading…
Add table
Reference in a new issue