diff --git a/Cargo.toml b/Cargo.toml index 01f3f1ca..a247f0ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ authors = [ [dependencies] futures = "0.3.5" -tokio = { version = "0.2.21", features = ["fs", "stream"] } +tokio = { version = "0.2.21", features = ["fs", "stream", "full"] } tokio-util = "0.3.1" pin-project = "0.4.23" bytes = "0.5.5" diff --git a/src/bot/limits.rs b/src/bot/limits.rs index b99586b1..3cdf0f54 100644 --- a/src/bot/limits.rs +++ b/src/bot/limits.rs @@ -315,7 +315,7 @@ where { type Err = R::Err; type Send = ThrottlingSend; - type SendRef = ThrottlingSend; + type SendRef = ThrottlingSendRef; fn send(self) -> Self::Send { let (tx, rx) = channel(); @@ -324,7 +324,18 @@ where } fn send_ref(&self) -> Self::SendRef { - unimplemented!() + let (tx, rx) = channel(); + let send = self.1.clone().send_t((self.0.payload_ref().get_chat_id().clone(), tx)); + + // As we can't move self.0 (request) out, as we do in `send` we are + // forced to call `send_ref()`. This may have overhead and/or lead to + // wrong results because `R::send_ref` does the send. + // + // However `Request` documentation explicitly notes that `send{,_ref}` + // should **not** do any kind of work, so it's ok. + let request = self.0.send_ref(); + + ThrottlingSendRef::Registering { request, send, wait: rx } } } @@ -391,6 +402,70 @@ impl Future for ThrottlingSend { } } +#[pin_project::pin_project(project = SendRefProj, project_replace = SendRefRepl)] +pub enum ThrottlingSendRef { + Registering { + request: R::SendRef, + #[pin] + send: ChanSend, + wait: Receiver<()>, + }, + Pending { + request: R::SendRef, + #[pin] + wait: Receiver<()>, + }, + Sent { + #[pin] + fut: R::SendRef, + }, + Done, +} + +impl Future for ThrottlingSendRef { + type Output = Result, R::Err>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().project() { + SendRefProj::Registering { request: _, send, wait: _ } => match send.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(r) => { + // FIXME(waffle): remove unwrap + r.unwrap(); + if let SendRefRepl::Registering { request, send: _, wait } = + self.as_mut().project_replace(ThrottlingSendRef::Done) + { + self.as_mut().project_replace(ThrottlingSendRef::Pending { request, wait }); + } + + self.poll(cx) + } + }, + SendRefProj::Pending { request: _, wait } => match wait.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(r) => { + // FIXME(waffle): remove unwrap + r.unwrap(); + if let SendRefRepl::Pending { request, wait: _ } = + self.as_mut().project_replace(ThrottlingSendRef::Done) + { + self.as_mut().project_replace(ThrottlingSendRef::Sent { fut: request }); + } + + self.poll(cx) + } + }, + SendRefProj::Sent { fut } => { + let res = futures::ready!(fut.poll(cx)); + self.set(ThrottlingSendRef::Done); + Poll::Ready(res) + } + SendRefProj::Done => Poll::Pending, + } + } +} + + mod chan_send { use std::{future::Future, pin::Pin}; diff --git a/src/requests/request.rs b/src/requests/request.rs index b29a6c00..da2be6fa 100644 --- a/src/requests/request.rs +++ b/src/requests/request.rs @@ -3,7 +3,18 @@ use std::future::Future; use crate::requests::{HasPayload, Output}; /// A ready-to-send telegram request. +/// // FIXME(waffle): Write better doc for the trait +/// +/// ## Implementation notes +/// +/// It is not recommended to do any kind of _work_ in `send` or `send_ref`. +/// Instead it's recommended to do all the (possible) stuff in the returned +/// future. In other words — keep it lazy. +/// +/// This is crucial for request wrappers which may want to cancel and/or never +/// send the underlying request. E.g.: [`Throttle`]'s `send_ref` calls +/// `B::send_ref` while _not_ meaning to really send the request right now. #[cfg_attr(all(docsrs, feature = "nightly"), doc(spotlight))] pub trait Request: HasPayload { /*