[throttle] use channel close to send unlock "messages"

Also use `with_capacity` for queue
This commit is contained in:
Waffle 2020-10-02 18:37:15 +03:00
parent 380e189ec7
commit d43f2514d3
3 changed files with 97 additions and 64 deletions

View file

@ -17,7 +17,7 @@ authors = [
[dependencies]
futures = "0.3.5"
tokio = { version = "0.2.21", features = ["fs", "stream", "full"] }
tokio = { version = "0.2.21", features = ["fs", "stream"] }
tokio-util = "0.3.1"
pin-project = "0.4.23"
bytes = "0.5.5"
@ -33,6 +33,7 @@ derive_more = "0.99.9"
mime = "0.3.16"
thiserror = "1.0.20"
once_cell = "1.4.0"
never = "0.1.0"
# FIXME(waffle): use crates.io once published
vecrem = { git = "https://github.com/WaffleLapkin/vecrem", rev = "6b9b6f42342df8b75548c6ed387072ff235429b1" }

View file

@ -1,11 +1,13 @@
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
future::Future,
hash::{Hash, Hasher},
pin::Pin,
time::{Duration, Instant},
};
use futures::task::{Context, Poll};
use never::Never;
use tokio::{
sync::{
mpsc::{self, error::TryRecvError},
@ -21,51 +23,58 @@ use crate::{
requests::{HasPayload, Output, Request, Requester},
types::ChatId,
};
use std::hash::{Hash, Hasher};
// Throttling is quite complicated this comment describes the algorithm of current implementation.
// NOTE: this only describes CURRENT implementation. Implementation may change at any time.
// Throttling is quite complicated this comment describes the algorithm of
// current implementation. NOTE: this only describes CURRENT implementation.
// Implementation may change at any time.
//
// ### Request
//
// When throttling request is sent, it sends a tuple of `ChatId` (more accurately, just local `Id`)
// and `Sender<()>` to the worker. Then the request waits for notification from worker. When
// notification is received it sends underlying request.
// When throttling request is sent, it sends a tuple of `ChatId` (more
// accurately, just local `Id`) and `Sender<()>` to the worker. Then the request
// waits for notification from worker. When notification is received it sends
// underlying request.
//
// ### Worker
//
// Worker does the most important job - it checks for limit exceed.
//
// The worker stores "history" of requests sent in last minute (and to which chats the were sent)
// and queue of pending updates.
// The worker stores "history" of requests sent in last minute (and to which
// chats the were sent) and queue of pending updates.
//
// The worker does the following algorithm loop:
//
// 1. If queue is empty wait for the first message in incoming channel (and adds it to queue).
// 1. If queue is empty wait for the first message in incoming channel (and adds
// it to queue).
//
// 2. Read all present messages from incoming channel and transfer them to queue.
// 2. Read all present messages from incoming channel and transfer them to
// queue.
//
// 3. Record current time.
//
// 4. Clear history from records which time < (current - minute)
//
// 5. Count all requests in which were sent last second, `allowed = limit.overall_s - count`
// 5. Count all requests in which were sent last second,
// `allowed = limit.overall_s - count`
//
// 6. If `allowed == 0` wait a bit and `continue` to the next iteration
//
// 7. Count how many requests were sent to which chats (i.e.: create `Map<ChatId, Count>`)
// (note: the same map, but for last minute also exists, but it's updated, instead of recreation)
// 7. Count how many requests were sent to which chats (i.e.: create
// `Map<ChatId, Count>`) (note: the same map, but for last minute also
// exists, but it's updated, instead of recreation)
//
// 8. While `allowed >= 0` search for requests which chat hasn't exceed limits
// (i.e.: map[chat] < limit), if one is found, decrease `allowed`, notify request
// that it can be now executed, increase counts, add record to history.
// (i.e.: map[chat] < limit), if one is found, decrease `allowed`, notify
// request that it can be now executed, increase counts, add record to
// history.
const MINUTE: Duration = Duration::from_secs(60);
const SECOND: Duration = Duration::from_secs(1);
// Delay between worker iterations.
//
// For now it's `second/4`, but that number is chosen pretty randomly, we may want to change this.
// For now it's `second/4`, but that number is chosen pretty randomly, we may
// want to change this.
const DELAY: Duration = Duration::from_millis(250);
/// Telegram request limits.
@ -102,20 +111,21 @@ impl Default for Limits {
/// bot to never receive responds from the bot or receive them in wrong order.
///
/// This bot wrapper automatically checks for limits, suspending requests until
/// they could be sent without exceeding limits (request order in chats is not changed).
/// they could be sent without exceeding limits (request order in chats is not
/// changed).
///
/// It's recommended to use this wrapper before other wrappers (i.e.: `SomeWrapper<Throttle<Bot>>`)
/// because if done otherwise inner wrappers may cause `Throttle` to miscalculate limits usage.
/// It's recommended to use this wrapper before other wrappers (i.e.:
/// `SomeWrapper<Throttle<Bot>>`) because if done otherwise inner wrappers may
/// cause `Throttle` to miscalculate limits usage.
///
/// [limits]: https://core.telegram.org/bots/faq#my-bot-is-hitting-limits-how-do-i-avoid-this
///
/// ## Examples
///
/// ```
/// use teloxide_core::{bot::Limits, Bot, requests::RequesterExt};
/// use teloxide_core::{bot::Limits, requests::RequesterExt, Bot};
///
/// let bot = Bot::new("TOKEN")
/// .throttle(Limits::default());
/// let bot = Bot::new("TOKEN").throttle(Limits::default());
///
/// /* send many requests here */
/// ```
@ -134,14 +144,17 @@ impl Default for Limits {
/// wrapper.
pub struct Throttle<B> {
bot: B,
queue: mpsc::Sender<(Id, Sender<()>)>,
// Sender<Never> is used to pass the signal to unlock by closing the channel.
queue: mpsc::Sender<(Id, Sender<Never>)>,
}
async fn worker(limits: Limits, mut queue_rx: mpsc::Receiver<(Id, Sender<()>)>) {
async fn worker(limits: Limits, mut queue_rx: mpsc::Receiver<(Id, Sender<Never>)>) {
// +- Same idea as in `Throttle::new`
let cap = limits.overall_s + (limits.overall_s / 4);
// FIXME(waffle): Make an research about data structures for this queue.
// Currently this is O(n) removing (n = number of elements
// stayed), amortized O(1) push (vec+vecrem).
let mut queue: Vec<(Id, Sender<()>)> = Vec::new(); // FIXME: with_cap
let mut queue: Vec<(Id, Sender<Never>)> = Vec::with_capacity(cap as usize);
// I wish there was special data structure for history which removed the
// need in 2 hashmaps
@ -259,11 +272,8 @@ async fn worker(limits: Limits, mut queue_rx: mpsc::Receiver<(Id, Sender<()>)>)
history.push_back((chat.clone(), Instant::now()));
}
// Explicitly ignore result.
//
// If request doesn't listen to unlock channel we don't want
// anything to do with it.
let _ = entry.remove().1.send(());
// This will close the channel unlocking associated request
drop(entry.remove());
// We've "sent" 1 request, so now we can send 1 less
allowed -= 1;
@ -387,7 +397,7 @@ impl GetChatId for SendMessage {
}
}
pub struct ThrottlingRequest<R>(R, mpsc::Sender<(Id, Sender<()>)>);
pub struct ThrottlingRequest<R>(R, mpsc::Sender<(Id, Sender<Never>)>);
impl<R: HasPayload> HasPayload for ThrottlingRequest<R> {
type Payload = R::Payload;
@ -440,12 +450,12 @@ enum ThrottlingSendInner<R: Request> {
request: R,
#[pin]
send: ChanSend,
wait: Receiver<()>,
wait: Receiver<Never>,
},
Pending {
request: R,
#[pin]
wait: Receiver<()>,
wait: Receiver<Never>,
},
Sent {
#[pin]
@ -463,13 +473,21 @@ impl<R: Request> Future for ThrottlingSend<R> {
match this.as_mut().project() {
SendProj::Registering { request: _, send, wait: _ } => match send.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(r) => {
// FIXME(waffle): remove unwrap
r.unwrap();
Poll::Ready(res) => {
if let SendRepl::Registering { request, send: _, wait } =
this.as_mut().project_replace(ThrottlingSendInner::Done)
{
this.as_mut().project_replace(ThrottlingSendInner::Pending { request, wait });
match res {
Ok(()) => this
.as_mut()
.project_replace(ThrottlingSendInner::Pending { request, wait }),
// The worker is unlikely to drop queue before sending all requests,
// but just in case it has dropped the queue, we want to just send the
// request.
Err(_) => this
.as_mut()
.project_replace(ThrottlingSendInner::Sent { fut: request.send() }),
};
}
self.poll(cx)
@ -478,12 +496,17 @@ impl<R: Request> Future for ThrottlingSend<R> {
SendProj::Pending { request: _, wait } => match wait.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(r) => {
// FIXME(waffle): remove unwrap
r.unwrap();
// Worker pass "message" to unlock us by closing the channel
match r {
Ok(never) => match never {},
Err(_) => {}
}
if let SendRepl::Pending { request, wait: _ } =
this.as_mut().project_replace(ThrottlingSendInner::Done)
{
this.as_mut().project_replace(ThrottlingSendInner::Sent { fut: request.send() });
this.as_mut()
.project_replace(ThrottlingSendInner::Sent { fut: request.send() });
}
self.poll(cx)
@ -508,12 +531,12 @@ enum ThrottlingSendRefInner<R: Request> {
request: R::SendRef,
#[pin]
send: ChanSend,
wait: Receiver<()>,
wait: Receiver<Never>,
},
Pending {
request: R::SendRef,
#[pin]
wait: Receiver<()>,
wait: Receiver<Never>,
},
Sent {
#[pin]
@ -531,13 +554,21 @@ impl<R: Request> Future for ThrottlingSendRef<R> {
match this.as_mut().project() {
SendRefProj::Registering { request: _, send, wait: _ } => match send.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(r) => {
// FIXME(waffle): remove unwrap
r.unwrap();
Poll::Ready(res) => {
if let SendRefRepl::Registering { request, send: _, wait } =
this.as_mut().project_replace(ThrottlingSendRefInner::Done)
{
this.as_mut().project_replace(ThrottlingSendRefInner::Pending { request, wait });
match res {
Ok(()) => this
.as_mut()
.project_replace(ThrottlingSendRefInner::Pending { request, wait }),
// The worker is unlikely to drop queue before sending all requests,
// but just in case it has dropped the queue, we want to just send the
// request.
Err(_) => this
.as_mut()
.project_replace(ThrottlingSendRefInner::Sent { fut: request }),
};
}
self.poll(cx)
@ -546,12 +577,17 @@ impl<R: Request> Future for ThrottlingSendRef<R> {
SendRefProj::Pending { request: _, wait } => match wait.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(r) => {
// FIXME(waffle): remove unwrap
r.unwrap();
// Worker pass "message" to unlock us by closing the channel
match r {
Ok(never) => match never {},
Err(_) => {}
}
if let SendRefRepl::Pending { request, wait: _ } =
this.as_mut().project_replace(ThrottlingSendRefInner::Done)
{
this.as_mut().project_replace(ThrottlingSendRefInner::Sent { fut: request });
this.as_mut()
.project_replace(ThrottlingSendRefInner::Sent { fut: request });
}
self.poll(cx)
@ -567,37 +603,34 @@ impl<R: Request> Future for ThrottlingSendRef<R> {
}
}
mod chan_send {
use std::{future::Future, pin::Pin};
use futures::task::{Context, Poll};
use never::Never;
use tokio::sync::{mpsc, mpsc::error::SendError, oneshot::Sender};
use crate::{
types::ChatId,
bot::limits::Id
};
use crate::bot::limits::Id;
pub(super) trait SendTy {
fn send_t(self, val: (Id, Sender<()>)) -> ChanSend;
fn send_t(self, val: (Id, Sender<Never>)) -> ChanSend;
}
#[pin_project::pin_project]
pub(super) struct ChanSend(#[pin] Inner);
#[cfg(not(feature = "nightly"))]
type Inner = Pin<Box<dyn Future<Output = Result<(), SendError<(Id, Sender<()>)>>>>>;
type Inner = Pin<Box<dyn Future<Output = Result<(), SendError<(Id, Sender<Never>)>>>>>;
#[cfg(feature = "nightly")]
type Inner = impl Future<Output = Result<(), SendError<(Id, Sender<()>)>>>;
type Inner = impl Future<Output = Result<(), SendError<(Id, Sender<Never>)>>>;
impl SendTy for mpsc::Sender<(Id, Sender<()>)> {
fn send_t(mut self, val: (Id, Sender<()>)) -> ChanSend {
impl SendTy for mpsc::Sender<(Id, Sender<Never>)> {
fn send_t(mut self, val: (Id, Sender<Never>)) -> ChanSend {
#[cfg(feature = "nightly")]
{
fn def(
mut sender: mpsc::Sender<(Id, Sender<()>)>,
val: (Id, Sender<()>),
mut sender: mpsc::Sender<(Id, Sender<Never>)>,
val: (Id, Sender<Never>),
) -> Inner {
async move { sender.send(val).await }
}
@ -609,7 +642,7 @@ mod chan_send {
}
impl Future for ChanSend {
type Output = Result<(), SendError<(Id, Sender<()>)>>;
type Output = Result<(), SendError<(Id, Sender<Never>)>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().0.poll(cx)

View file

@ -3,7 +3,6 @@ use std::future::Future;
use crate::requests::{HasPayload, Output};
/// A ready-to-send telegram request.
///
// FIXME(waffle): Write better doc for the trait
///
/// ## Implementation notes