Use tokio's try_recv instead of hacks

This commit is contained in:
Maybe Waffle 2022-01-17 15:59:39 +03:00
parent 435181fa0b
commit 439a345dee
2 changed files with 16 additions and 21 deletions

View file

@ -22,7 +22,7 @@ exclude = [
[dependencies]
futures = "0.3.5"
tokio = { version = "1.8.0", features = ["fs"] }
tokio = { version = "1.12.0", features = ["fs"] }
tokio-util = "0.6.0"
pin-project = "1.0.3"
bytes = "1.0.0"
@ -66,7 +66,7 @@ native-tls = ["reqwest/native-tls"]
nightly = []
# Throttling bot adaptor
throttle = ["vecrem"]
throttle = ["vecrem", "tokio/macros"]
# Trace bot adaptor
trace_adaptor = []

View file

@ -11,10 +11,10 @@ use either::Either;
use futures::{
future::{ready, BoxFuture},
task::{Context, Poll},
FutureExt,
};
use tokio::sync::{
mpsc,
mpsc::error::TryRecvError,
oneshot::{self, Receiver, Sender},
};
use url::Url;
@ -573,8 +573,7 @@ async fn worker<B>(
}
fn answer_info(rx: &mut mpsc::Receiver<InfoMessage>, limits: &mut Limits) {
// FIXME(waffle): https://github.com/tokio-rs/tokio/issues/3350
while let Some(Some(req)) = tokio::task::unconstrained(rx.recv()).now_or_never() {
while let Ok(req) = rx.try_recv() {
// Errors are ignored with .ok(). Error means that the response channel
// is closed and the response isn't needed.
match req {
@ -595,17 +594,14 @@ async fn freeze(
bot: &impl Requester,
mut imm: Option<FreezeUntil>,
) {
// FIXME(waffle): https://github.com/tokio-rs/tokio/issues/3350
while let Some(FreezeUntil {
until,
after,
chat,
mut retry,
}) = imm.take().or_else(|| {
tokio::task::unconstrained(rx.recv())
.now_or_never()
.flatten()
}) {
while let Some(freeze_until) = imm.take().or_else(|| rx.try_recv().ok()) {
let FreezeUntil {
until,
after,
chat,
mut retry,
} = freeze_until;
if let Some(slow_mode) = slow_mode.as_deref_mut() {
// TODO: do something with channels?...
if let hash @ ChatIdHash::Id(id) = chat {
@ -667,12 +663,11 @@ async fn read_from_rx<T>(rx: &mut mpsc::Receiver<T>, queue: &mut Vec<T>, rx_is_c
// Don't grow queue bigger than the capacity to limit DOS possibility
while queue.len() < queue.capacity() {
// FIXME(waffle): https://github.com/tokio-rs/tokio/issues/3350
match tokio::task::unconstrained(rx.recv()).now_or_never() {
Some(Some(req)) => queue.push(req),
Some(None) => *rx_is_closed = true,
match rx.try_recv() {
Ok(req) => queue.push(req),
Err(TryRecvError::Disconnected) => *rx_is_closed = true,
// There are no items in queue.
None => break,
Err(TryRecvError::Empty) => break,
}
}
}