Simplify freezing

This commit is contained in:
Maybe Waffle 2022-02-07 22:11:47 +03:00
parent dcbef82e43
commit ed48de2f75
3 changed files with 10 additions and 58 deletions

View file

@ -144,40 +144,19 @@ where
let after = Duration::from_secs(retry_after.into()); let after = Duration::from_secs(retry_after.into());
let until = Instant::now() + after; let until = Instant::now() + after;
// If we'll retry, we check that worker hasn't died at the start of the loop
// otherwise we don't care if the worker is alive or not
let _ = freeze.send(FreezeUntil { until, after, chat }).await;
if retry { if retry {
log::warn!("Freezing, before retrying: {}", retry_after); log::warn!("Freezing, before retrying: {}", retry_after);
} tokio::time::sleep_until(until.into()).await;
let (lock, wait) = channel();
let r = freeze
.send(FreezeUntil {
until,
after,
chat,
retry: Some(lock),
})
.await;
if retry {
match r {
Ok(()) => {
// TODO: do we need `_retry` or `_freeze_tx`?
let (_retry, _freeze_tx) = wait.await;
}
// The worker has died, sleep until we may retry
Err(_) => {
log::error!("Worker has died while request w");
tokio::time::sleep_until(until.into()).await;
}
}
} }
} }
match res { match res {
res @ Ok(_) => break res,
Err(_) if retry && retry_after.is_some() => continue, Err(_) if retry && retry_after.is_some() => continue,
res @ Err(_) => break res, res => break res,
}; };
} }
} }

View file

@ -15,6 +15,7 @@ type BoxedFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
/// ///
/// let settings = throttle::Settings::default() /// let settings = throttle::Settings::default()
/// .on_queue_full(|pending| async move { /* do something when internal queue is full */ }); /// .on_queue_full(|pending| async move { /* do something when internal queue is full */ });
///
/// // use settings in `Throttle::with_settings` or other constructors /// // use settings in `Throttle::with_settings` or other constructors
/// # let _ = settings; /// # let _ = settings;
/// ``` /// ```

View file

@ -45,7 +45,6 @@ pub(super) struct FreezeUntil {
pub(super) until: Instant, pub(super) until: Instant,
pub(super) after: Duration, pub(super) after: Duration,
pub(super) chat: ChatIdHash, pub(super) chat: ChatIdHash,
pub(super) retry: Option<RequestLock>,
} }
// Throttling is quite complicated. This comment describes the algorithm of the // Throttling is quite complicated. This comment describes the algorithm of the
@ -135,30 +134,18 @@ pub(super) async fn worker<B>(
// *blocked in asyncronous way // *blocked in asyncronous way
answer_info(&mut info_rx, &mut limits); answer_info(&mut info_rx, &mut limits);
freeze(
&mut freeze_rx,
&freeze_tx,
slow_mode.as_mut(),
&mut queue,
&bot,
None,
)
.await;
loop { loop {
tokio::select! { tokio::select! {
() = read_from_rx(&mut rx, &mut queue, &mut rx_is_closed) => break,
freeze_until = freeze_rx.recv() => { freeze_until = freeze_rx.recv() => {
freeze( freeze(
&mut freeze_rx, &mut freeze_rx,
&freeze_tx,
slow_mode.as_mut(), slow_mode.as_mut(),
&mut queue,
&bot, &bot,
freeze_until freeze_until
) )
.await; .await;
}, },
() = read_from_rx(&mut rx, &mut queue, &mut rx_is_closed) => break,
} }
} }
//debug_assert_eq!(queue.capacity(), limits.messages_per_sec_overall as usize); //debug_assert_eq!(queue.capacity(), limits.messages_per_sec_overall as usize);
@ -316,19 +303,12 @@ fn answer_info(rx: &mut mpsc::Receiver<InfoMessage>, limits: &mut Limits) {
async fn freeze( async fn freeze(
rx: &mut mpsc::Receiver<FreezeUntil>, rx: &mut mpsc::Receiver<FreezeUntil>,
tx: &mpsc::Sender<FreezeUntil>,
mut slow_mode: Option<&mut HashMap<ChatIdHash, (Duration, Instant)>>, mut slow_mode: Option<&mut HashMap<ChatIdHash, (Duration, Instant)>>,
queue: &mut Vec<(ChatIdHash, RequestLock)>,
bot: &impl Requester, bot: &impl Requester,
mut imm: Option<FreezeUntil>, mut imm: Option<FreezeUntil>,
) { ) {
while let Some(freeze_until) = imm.take().or_else(|| rx.try_recv().ok()) { while let Some(freeze_until) = imm.take().or_else(|| rx.try_recv().ok()) {
let FreezeUntil { let FreezeUntil { until, after, chat } = freeze_until;
until,
after,
chat,
mut retry,
} = freeze_until;
if let Some(slow_mode) = slow_mode.as_deref_mut() { if let Some(slow_mode) = slow_mode.as_deref_mut() {
// TODO: do something with channels?... // TODO: do something with channels?...
@ -359,9 +339,7 @@ async fn freeze(
// Do not sleep if slow mode is enabled since the freeze is most likely caused // Do not sleep if slow mode is enabled since the freeze is most likely caused
// by the said slow mode and not by the global limits. // by the said slow mode and not by the global limits.
if slow_mode_enabled_and_likely_the_cause { if !slow_mode_enabled_and_likely_the_cause {
queue.extend(Some(chat).zip(retry.take()));
} else {
log::warn!( log::warn!(
"freezing the bot for approximately {:?} due to `RetryAfter` error from telegram", "freezing the bot for approximately {:?} due to `RetryAfter` error from telegram",
after after
@ -370,12 +348,6 @@ async fn freeze(
tokio::time::sleep_until(until.into()).await; tokio::time::sleep_until(until.into()).await;
log::warn!("unfreezing the bot"); log::warn!("unfreezing the bot");
if let Some(lock) = retry {
// Since we are already retrying the request, retries are obviously turned on.
let retry = true;
let _ = lock.unlock(retry, tx.clone());
}
} }
} }
} }