diff --git a/Cargo.toml b/Cargo.toml index e9a9a648..3003d6ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,12 +17,11 @@ authors = [ [dependencies] futures = "0.3.5" -tokio = { version = "0.2.21", features = ["fs", "stream"] } -tokio-util = "0.3.1" -pin-project = "0.4.23" -bytes = "0.5.5" -async-trait = "0.1.36" -reqwest = { version = "0.10.6", features = ["json", "stream"] } +tokio = { version = "1.0.1", features = ["fs"] } +tokio-util = "0.6.0" +pin-project = "1.0.3" +bytes = "1.0.0" +reqwest = { version = "0.11.0", features = ["json", "stream", "multipart"] } serde = { version = "1.0.114", features = ["derive"] } serde_json = "1.0.55" @@ -32,7 +31,7 @@ uuid = { version = "0.8.1", features = ["v4"] } # for attaching input files derive_more = "0.99.9" mime = "0.3.16" thiserror = "1.0.20" -once_cell = "1.4.0" +once_cell = "1.5.0" never = "0.1.0" # FIXME(waffle): use crates.io once published diff --git a/src/adaptors/throttle.rs b/src/adaptors/throttle.rs index a71fafa5..75d79a80 100644 --- a/src/adaptors/throttle.rs +++ b/src/adaptors/throttle.rs @@ -6,14 +6,14 @@ use std::{ time::{Duration, Instant}, }; -use futures::task::{Context, Poll}; +use futures::{ + task::{Context, Poll}, + FutureExt, +}; use never::Never; -use tokio::{ - sync::{ - mpsc::{self, error::TryRecvError}, - oneshot::{channel, Receiver, Sender}, - }, - time::delay_for, +use tokio::sync::{ + mpsc, + oneshot::{channel, Receiver, Sender}, }; use vecrem::VecExt; @@ -182,10 +182,13 @@ async fn worker(limits: Limits, mut queue_rx: mpsc::Receiver<(Id, Sender<Never>) // update local queue with latest requests loop { - match queue_rx.try_recv() { - Ok(req) => queue.push(req), - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Closed) => close = true, + // FIXME(waffle): https://github.com/tokio-rs/tokio/issues/3350 + match queue_rx.recv().now_or_never() { + Some(Some(req)) => queue.push(req), + // There are no items in queue + None => break, + // The queue was closed + Some(None) => close = true, } } @@ -256,7 +259,7 @@ async fn worker(limits: Limits, mut queue_rx: mpsc::Receiver<(Id, Sender<Never>) if allowed == 0 { hchats_s.clear(); - delay_for(DELAY).await; + tokio::time::sleep(DELAY).await; continue; } @@ -296,7 +299,7 @@ async fn worker(limits: Limits, mut queue_rx: mpsc::Receiver<(Id, Sender<Never>) // It's easier to just recompute last second stats, instead of keeping // track of it alongside with minute stats, so we just throw this away. hchats_s.clear(); - delay_for(DELAY).await; + tokio::time::sleep(DELAY).await; } } @@ -764,7 +767,7 @@ mod chan_send { #[cfg(feature = "nightly")] { fn def( - mut sender: mpsc::Sender<(Id, Sender<Never>)>, + sender: mpsc::Sender<(Id, Sender<Never>)>, val: (Id, Sender<Never>), ) -> Inner { async move { sender.send(val).await } @@ -773,7 +776,7 @@ mod chan_send { } #[cfg(not(feature = "nightly"))] { - let mut this = self; + let this = self; return ChanSend(Box::pin(async move { this.send(val).await })); } } diff --git a/src/bot/mod.rs b/src/bot/mod.rs index 00032315..07c78829 100644 --- a/src/bot/mod.rs +++ b/src/bot/mod.rs @@ -202,7 +202,7 @@ pub(crate) fn sound_bot() -> ClientBuilder { ClientBuilder::new() .connect_timeout(connect_timeout) .timeout(Duration::from_secs(connect_timeout.as_secs() + timeout + 2)) - .tcp_nodelay_(true) + .tcp_nodelay(true) .default_headers(headers) } diff --git a/src/net/request.rs b/src/net/request.rs index 15fb3d27..b8726001 100644 --- a/src/net/request.rs +++ b/src/net/request.rs @@ -56,7 +56,7 @@ where T: DeserializeOwned, { if response.status().is_server_error() { - tokio::time::delay_for(DELAY_ON_SERVER_ERROR).await; + tokio::time::sleep(DELAY_ON_SERVER_ERROR).await; } serde_json::from_str::<TelegramResponse<T>>(