[throttle] implement send_ref

This commit is contained in:
Waffle 2020-10-01 17:21:30 +03:00
parent 247868a815
commit 7fdf9cd9a6
3 changed files with 89 additions and 3 deletions

View file

@ -17,7 +17,7 @@ authors = [
[dependencies]
futures = "0.3.5"
tokio = { version = "0.2.21", features = ["fs", "stream"] }
tokio = { version = "0.2.21", features = ["fs", "stream", "full"] }
tokio-util = "0.3.1"
pin-project = "0.4.23"
bytes = "0.5.5"

View file

@ -315,7 +315,7 @@ where
{
type Err = R::Err;
type Send = ThrottlingSend<R>;
type SendRef = ThrottlingSend<R>;
type SendRef = ThrottlingSendRef<R>;
fn send(self) -> Self::Send {
let (tx, rx) = channel();
@ -324,7 +324,18 @@ where
}
fn send_ref(&self) -> Self::SendRef {
unimplemented!()
let (tx, rx) = channel();
let send = self.1.clone().send_t((self.0.payload_ref().get_chat_id().clone(), tx));
// As we can't move self.0 (request) out, as we do in `send` we are
// forced to call `send_ref()`. This may have overhead and/or lead to
// wrong results because `R::send_ref` does the send.
//
// However `Request` documentation explicitly notes that `send{,_ref}`
// should **not** do any kind of work, so it's ok.
let request = self.0.send_ref();
ThrottlingSendRef::Registering { request, send, wait: rx }
}
}
@ -391,6 +402,70 @@ impl<R: Request> Future for ThrottlingSend<R> {
}
}
#[pin_project::pin_project(project = SendRefProj, project_replace = SendRefRepl)]
pub enum ThrottlingSendRef<R: Request> {
Registering {
request: R::SendRef,
#[pin]
send: ChanSend,
wait: Receiver<()>,
},
Pending {
request: R::SendRef,
#[pin]
wait: Receiver<()>,
},
Sent {
#[pin]
fut: R::SendRef,
},
Done,
}
impl<R: Request> Future for ThrottlingSendRef<R> {
type Output = Result<Output<R>, R::Err>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().project() {
SendRefProj::Registering { request: _, send, wait: _ } => match send.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(r) => {
// FIXME(waffle): remove unwrap
r.unwrap();
if let SendRefRepl::Registering { request, send: _, wait } =
self.as_mut().project_replace(ThrottlingSendRef::Done)
{
self.as_mut().project_replace(ThrottlingSendRef::Pending { request, wait });
}
self.poll(cx)
}
},
SendRefProj::Pending { request: _, wait } => match wait.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(r) => {
// FIXME(waffle): remove unwrap
r.unwrap();
if let SendRefRepl::Pending { request, wait: _ } =
self.as_mut().project_replace(ThrottlingSendRef::Done)
{
self.as_mut().project_replace(ThrottlingSendRef::Sent { fut: request });
}
self.poll(cx)
}
},
SendRefProj::Sent { fut } => {
let res = futures::ready!(fut.poll(cx));
self.set(ThrottlingSendRef::Done);
Poll::Ready(res)
}
SendRefProj::Done => Poll::Pending,
}
}
}
mod chan_send {
use std::{future::Future, pin::Pin};

View file

@ -3,7 +3,18 @@ use std::future::Future;
use crate::requests::{HasPayload, Output};
/// A ready-to-send telegram request.
///
// FIXME(waffle): Write better doc for the trait
///
/// ## Implementation notes
///
/// It is not recommended to do any kind of _work_ in `send` or `send_ref`.
/// Instead it's recommended to do all the (possible) stuff in the returned
/// future. In other words — keep it lazy.
///
/// This is crucial for request wrappers which may want to cancel and/or never
/// send the underlying request. E.g.: [`Throttle<B>`]'s `send_ref` calls
/// `B::send_ref` while _not_ meaning to really send the request right now.
#[cfg_attr(all(docsrs, feature = "nightly"), doc(spotlight))]
pub trait Request: HasPayload {
/*