From ed48de2f751c4a321d41e30e376fde1a4418a693 Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Mon, 7 Feb 2022 22:11:47 +0300 Subject: [PATCH] Simplify freezing --- src/adaptors/throttle/request.rs | 33 ++++++------------------------ src/adaptors/throttle/settings.rs | 1 + src/adaptors/throttle/worker.rs | 34 +++---------------------------- 3 files changed, 10 insertions(+), 58 deletions(-) diff --git a/src/adaptors/throttle/request.rs b/src/adaptors/throttle/request.rs index 49ba02c3..c400f282 100644 --- a/src/adaptors/throttle/request.rs +++ b/src/adaptors/throttle/request.rs @@ -144,40 +144,19 @@ where let after = Duration::from_secs(retry_after.into()); let until = Instant::now() + after; + // If we'll retry, we check that worker hasn't died at the start of the loop + // otherwise we don't care if the worker is alive or not + let _ = freeze.send(FreezeUntil { until, after, chat }).await; + if retry { log::warn!("Freezing, before retrying: {}", retry_after); - } - - let (lock, wait) = channel(); - - let r = freeze - .send(FreezeUntil { - until, - after, - chat, - retry: Some(lock), - }) - .await; - - if retry { - match r { - Ok(()) => { - // TODO: do we need `_retry` or `_freeze_tx`? - let (_retry, _freeze_tx) = wait.await; - } - // The worker has died, sleep until we may retry - Err(_) => { - log::error!("Worker has died while request w"); - tokio::time::sleep_until(until.into()).await; - } - } + tokio::time::sleep_until(until.into()).await; } } match res { - res @ Ok(_) => break res, Err(_) if retry && retry_after.is_some() => continue, - res @ Err(_) => break res, + res => break res, }; } } diff --git a/src/adaptors/throttle/settings.rs b/src/adaptors/throttle/settings.rs index 82ef7e4a..a1829cc1 100644 --- a/src/adaptors/throttle/settings.rs +++ b/src/adaptors/throttle/settings.rs @@ -15,6 +15,7 @@ type BoxedFuture = Pin + Send>>; /// /// let settings = throttle::Settings::default() /// .on_queue_full(|pending| async move { /* do something when internal queue is full */ }); +/// /// // use settings in `Throttle::with_settings` or other constructors /// # let _ = settings; /// ``` diff --git a/src/adaptors/throttle/worker.rs b/src/adaptors/throttle/worker.rs index eae5961f..85bca5bd 100644 --- a/src/adaptors/throttle/worker.rs +++ b/src/adaptors/throttle/worker.rs @@ -45,7 +45,6 @@ pub(super) struct FreezeUntil { pub(super) until: Instant, pub(super) after: Duration, pub(super) chat: ChatIdHash, - pub(super) retry: Option, } // Throttling is quite complicated. This comment describes the algorithm of the @@ -135,30 +134,18 @@ pub(super) async fn worker( // *blocked in asyncronous way answer_info(&mut info_rx, &mut limits); - freeze( - &mut freeze_rx, - &freeze_tx, - slow_mode.as_mut(), - &mut queue, - &bot, - None, - ) - .await; - loop { tokio::select! { - () = read_from_rx(&mut rx, &mut queue, &mut rx_is_closed) => break, freeze_until = freeze_rx.recv() => { freeze( &mut freeze_rx, - &freeze_tx, slow_mode.as_mut(), - &mut queue, &bot, freeze_until ) .await; }, + () = read_from_rx(&mut rx, &mut queue, &mut rx_is_closed) => break, } } //debug_assert_eq!(queue.capacity(), limits.messages_per_sec_overall as usize); @@ -316,19 +303,12 @@ fn answer_info(rx: &mut mpsc::Receiver, limits: &mut Limits) { async fn freeze( rx: &mut mpsc::Receiver, - tx: &mpsc::Sender, mut slow_mode: Option<&mut HashMap>, - queue: &mut Vec<(ChatIdHash, RequestLock)>, bot: &impl Requester, mut imm: Option, ) { while let Some(freeze_until) = imm.take().or_else(|| rx.try_recv().ok()) { - let FreezeUntil { - until, - after, - chat, - mut retry, - } = freeze_until; + let FreezeUntil { until, after, chat } = freeze_until; if let Some(slow_mode) = slow_mode.as_deref_mut() { // TODO: do something with channels?... @@ -359,9 +339,7 @@ async fn freeze( // Do not sleep if slow mode is enabled since the freeze is most likely caused // by the said slow mode and not by the global limits. - if slow_mode_enabled_and_likely_the_cause { - queue.extend(Some(chat).zip(retry.take())); - } else { + if !slow_mode_enabled_and_likely_the_cause { log::warn!( "freezing the bot for approximately {:?} due to `RetryAfter` error from telegram", after @@ -370,12 +348,6 @@ async fn freeze( tokio::time::sleep_until(until.into()).await; log::warn!("unfreezing the bot"); - - if let Some(lock) = retry { - // Since we are already retrying the request, retries are obviously turned on. - let retry = true; - let _ = lock.unlock(retry, tx.clone()); - } } } }