mirror of
https://github.com/teloxide/teloxide.git
synced 2025-03-14 11:44:04 +01:00
[throttle] refactor#0
- Move all data to the worker making it unshared. - Use mpsc channel to update requests queue - Yield if there are no requests in queue - Remove `:0` entries from `hchats` - Remove debug prints - Remove use of `block_on`
This commit is contained in:
parent
4e412b39e1
commit
f270613e7e
1 changed files with 115 additions and 64 deletions
|
@ -2,7 +2,7 @@ use crate::Bot;
|
|||
use std::collections::{VecDeque, HashMap};
|
||||
use std::time::Instant;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::{Mutex, mpsc};
|
||||
use std::cmp::max;
|
||||
use crate::requests::{Requester, Request, Output, HasPayload, Payload};
|
||||
use crate::payloads::{GetMe, SendMessage};
|
||||
|
@ -17,8 +17,11 @@ use tokio::time::{delay_until, delay_for};
|
|||
use futures::stream::FuturesUnordered;
|
||||
use futures::executor::block_on;
|
||||
use futures::future::join3;
|
||||
use futures::future::ready;
|
||||
|
||||
const MINUTE: Duration = Duration::from_secs(10);
|
||||
// FIXME: rename to Throttle
|
||||
|
||||
const MINUTE: Duration = Duration::from_secs(50); // FIXME: min = sec * 10 only in tests
|
||||
const SECOND: Duration = Duration::from_secs(1);
|
||||
const DELAY: Duration = Duration::from_millis(250); // second/4
|
||||
|
||||
|
@ -43,42 +46,52 @@ impl Default for Limits {
|
|||
}
|
||||
|
||||
pub struct Limited<B> {
|
||||
// Some fields are probably only needed by the worker
|
||||
//limits: Limits,
|
||||
bot: B,
|
||||
queue: Arc<Mutex<VecDeque<(ChatId, Sender<()>)>>>, // FIXME: struct with fast remove and add
|
||||
history: Arc<Mutex<VecDeque<(ChatId, Instant)>>>,
|
||||
hchats: Arc<Mutex<HashMap<ChatId, u32>>>,
|
||||
queue: mpsc::Sender<(ChatId, Sender<()>)>,
|
||||
}
|
||||
|
||||
async fn worker(
|
||||
limits: Limits,
|
||||
queue: Arc<Mutex<VecDeque<(ChatId, Sender<()>)>>>,
|
||||
history: Arc<Mutex<VecDeque<(ChatId, Instant)>>>,
|
||||
hchats: Arc<Mutex<HashMap<ChatId, u32>>>,
|
||||
mut queue_rx: mpsc::Receiver<(ChatId, Sender<()>)>,
|
||||
) {
|
||||
// FIXME: remove unnecessary ChatId clones
|
||||
loop {
|
||||
println!("1");
|
||||
let mut history = history.lock().await;
|
||||
let mut hchats = hchats.lock().await;
|
||||
let mut queue = queue.lock().await;
|
||||
// FIXME: use spawn_blocking?
|
||||
|
||||
let now = dbg!(Instant::now());
|
||||
// FIXME: remove unnecessary ChatId clones
|
||||
|
||||
// FIXME: struct with fast random remove and append-to-the-end
|
||||
let mut queue: VecDeque<(ChatId, Sender<()>)> = VecDeque::new(); // FIXME: with_cap
|
||||
|
||||
let mut history: VecDeque<(ChatId, Instant)> = VecDeque::new();
|
||||
// hchats[chat] = history.iter().filter(|(c, _)| c == chat).count()
|
||||
let mut hchats: HashMap<ChatId, u32> = HashMap::new();
|
||||
|
||||
loop {
|
||||
// If there are no pending requests we are just waiting
|
||||
if queue.is_empty() {
|
||||
queue.push_back(queue_rx.recv().await.unwrap());
|
||||
}
|
||||
|
||||
// update local queue with latest requests
|
||||
while let Ok(e) = queue_rx.try_recv() {
|
||||
// FIXME: properly check for errors (stop when the bot's sender is dropped?)
|
||||
queue.push_back(e)
|
||||
}
|
||||
|
||||
let now = Instant::now();
|
||||
let min_back = now - MINUTE;
|
||||
let sec_back = now - SECOND;
|
||||
|
||||
println!("2");
|
||||
|
||||
// make history and hchats up-to-date
|
||||
while let Some((_, time)) = history.front() {
|
||||
// history is sorted, we found first up-to-date thing
|
||||
if time >= &min_back { break; }
|
||||
|
||||
if let Some((chat, _)) = history.pop_front() {
|
||||
hchats
|
||||
if let Entry::Occupied(entry) = hchats
|
||||
.entry(chat)
|
||||
.and_modify(|count| { *count -= 1; }); // TODO: remove entries with count == 0
|
||||
.and_modify(|count| { *count -= 1; }) {
|
||||
if entry.get() == 0 { entry.remove_entry(); }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -90,8 +103,6 @@ async fn worker(
|
|||
continue;
|
||||
}
|
||||
|
||||
println!("3");
|
||||
|
||||
let mut hchats_s = HashMap::new();
|
||||
for (chat, _) in history.iter().take_while(|(_, time)| time > &sec_back) {
|
||||
*hchats_s
|
||||
|
@ -99,36 +110,26 @@ async fn worker(
|
|||
.or_insert(0) += 1;
|
||||
}
|
||||
|
||||
|
||||
dbg!(&hchats_s);
|
||||
dbg!(&hchats);
|
||||
|
||||
dbg!(allowed);
|
||||
|
||||
let mut i = 0;
|
||||
while allowed > 0 && i < queue.len() {
|
||||
let chat = &queue[i].0;
|
||||
|
||||
if dbg!(hchats_s
|
||||
if hchats_s
|
||||
.get(chat)
|
||||
.copied()
|
||||
.unwrap_or(0) < limits.chat_s) &&
|
||||
dbg!(hchats
|
||||
.unwrap_or(0) < limits.chat_s &&
|
||||
hchats
|
||||
.get(chat)
|
||||
.copied()
|
||||
.unwrap_or(0) < limits.chat_m)
|
||||
.unwrap_or(0) < limits.chat_m
|
||||
{
|
||||
let chat = chat.clone();
|
||||
*hchats_s.entry(chat.clone()).or_insert(0) += 1;
|
||||
*hchats.entry(chat.clone()).or_insert(0) += 1;
|
||||
|
||||
println!("worker send");
|
||||
dbg!(&hchats_s);
|
||||
dbg!(&hchats);
|
||||
history.push_back((chat, Instant::now()));
|
||||
queue.remove(i).unwrap().1.send(());
|
||||
allowed -= 1;
|
||||
dbg!(allowed);
|
||||
} else {
|
||||
i += 1;
|
||||
}
|
||||
|
@ -140,27 +141,15 @@ async fn worker(
|
|||
|
||||
impl<B> Limited<B> {
|
||||
pub fn new(bot: B, limits: Limits) -> (Self, impl Future<Output = ()>) {
|
||||
let history = Arc::new(Mutex::new(VecDeque::with_capacity(
|
||||
max(limits.chat_s, max(limits.overall_s, limits.chat_m)) as _
|
||||
)));
|
||||
|
||||
let queue = Arc::new(Mutex::new(VecDeque::new()));
|
||||
let hchats = Arc::new(Mutex::new(HashMap::new()));
|
||||
// FIXME: just a random number, currently
|
||||
let (queue_tx, queue_rx) = mpsc::channel(130);
|
||||
|
||||
let worker = worker(
|
||||
limits,
|
||||
Arc::clone(&queue),
|
||||
Arc::clone(&history),
|
||||
Arc::clone(&hchats),
|
||||
queue_rx,
|
||||
);
|
||||
|
||||
let this = Self {
|
||||
//limits,
|
||||
bot,
|
||||
history,
|
||||
queue,
|
||||
hchats,
|
||||
};
|
||||
let this = Self { bot, queue: queue_tx, };
|
||||
|
||||
(this, worker)
|
||||
}
|
||||
|
@ -180,7 +169,7 @@ impl<B: Requester> Requester for Limited<B> {
|
|||
C: Into<ChatId>,
|
||||
T: Into<String>
|
||||
{
|
||||
LimitedRequest(self.bot.send_message(chat_id, text), Arc::clone(&self.queue))
|
||||
LimitedRequest(self.bot.send_message(chat_id, text), self.queue.clone())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -195,7 +184,7 @@ impl GetChatId for SendMessage {
|
|||
}
|
||||
}
|
||||
|
||||
pub struct LimitedRequest<R>(R, Arc<Mutex<VecDeque<(ChatId, Sender<()>)>>>);
|
||||
pub struct LimitedRequest<R>(R, mpsc::Sender<(ChatId, Sender<()>)>);
|
||||
|
||||
impl<R: HasPayload> HasPayload for LimitedRequest<R> {
|
||||
type Payload = R::Payload;
|
||||
|
@ -217,13 +206,12 @@ where
|
|||
type Send = LimitedSend<R>;
|
||||
type SendRef = LimitedSend<R>;
|
||||
|
||||
fn send(self) -> Self::Send {
|
||||
fn send(mut self) -> Self::Send {
|
||||
let (tx, rx) = channel();
|
||||
// FIXME
|
||||
let mut g = block_on(self.1.lock());
|
||||
g.push_back((self.0.payload_ref().get_chat_id().clone(), tx));
|
||||
LimitedSend::Pending {
|
||||
let send = self.1.send_t((self.0.payload_ref().get_chat_id().clone(), tx));
|
||||
LimitedSend::Registering {
|
||||
request: self.0,
|
||||
send,
|
||||
wait: rx,
|
||||
}
|
||||
}
|
||||
|
@ -235,6 +223,12 @@ where
|
|||
|
||||
#[pin_project::pin_project(project = SendProj, project_replace = SendRepl)]
|
||||
pub enum LimitedSend<R: Request> {
|
||||
Registering {
|
||||
request: R,
|
||||
#[pin]
|
||||
send: ChanSend,
|
||||
wait: Receiver<()>,
|
||||
},
|
||||
Pending {
|
||||
request: R,
|
||||
#[pin]
|
||||
|
@ -251,12 +245,22 @@ impl<R: Request> Future for LimitedSend<R> {
|
|||
type Output = Result<Output<R>, R::Err>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
println!("poll");
|
||||
match self.as_mut().project() {
|
||||
SendProj::Registering { request: _, send, wait: _ } => match send.poll(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(r) => {
|
||||
// FIXME(waffle): remove unwrap
|
||||
r.unwrap();
|
||||
if let SendRepl::Registering { request, send: _, wait } = self.as_mut().project_replace(LimitedSend::Done) {
|
||||
self.as_mut().project_replace(LimitedSend::Pending { request, wait });
|
||||
}
|
||||
|
||||
self.poll(cx)
|
||||
}
|
||||
},
|
||||
SendProj::Pending { request: _, wait } => match wait.poll(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(r) => {
|
||||
println!("pending-ready");
|
||||
// FIXME(waffle): remove unwrap
|
||||
r.unwrap();
|
||||
if let SendRepl::Pending { request, wait: _ } = self.as_mut().project_replace(LimitedSend::Done) {
|
||||
|
@ -267,9 +271,7 @@ impl<R: Request> Future for LimitedSend<R> {
|
|||
}
|
||||
},
|
||||
SendProj::Sent { fut } => {
|
||||
println!("sent");
|
||||
let res = futures::ready!(fut.poll(cx));
|
||||
println!("sent-ready");
|
||||
self.set(LimitedSend::Done);
|
||||
Poll::Ready(res)
|
||||
}
|
||||
|
@ -277,3 +279,52 @@ impl<R: Request> Future for LimitedSend<R> {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
use chan_send::{ChanSend, SendTy as _};
|
||||
use crate::bot::limits::chan_send::SendTy;
|
||||
use std::collections::hash_map::Entry;
|
||||
|
||||
mod chan_send {
|
||||
use tokio::sync::mpsc;
|
||||
use crate::types::ChatId;
|
||||
use tokio::sync::oneshot::Sender;
|
||||
use std::future::Future;
|
||||
use futures::task::{Context, Poll};
|
||||
use pin_project::__private::Pin;
|
||||
use tokio::sync::mpsc::error::SendError;
|
||||
|
||||
pub(crate) trait SendTy {
|
||||
fn send_t(self, val: (ChatId, Sender<()>)) -> ChanSend;
|
||||
}
|
||||
|
||||
#[pin_project::pin_project]
|
||||
pub/*(crate) */struct ChanSend(#[pin] Inner); // FIXME
|
||||
|
||||
#[cfg(not(feature = "nightly"))]
|
||||
type Inner = Pin<Box<dyn Future<Output = Result<(), SendError<(ChatId, Sender<()>)>>>>>;
|
||||
#[cfg(feature = "nightly")]
|
||||
type Inner = impl Future<Output = Result<(), SendError<(ChatId, Sender<()>)>>>;
|
||||
|
||||
impl SendTy for mpsc::Sender<(ChatId, Sender<()>)> {
|
||||
fn send_t(mut self, val: (ChatId, Sender<()>)) -> ChanSend {
|
||||
#[cfg(feature = "nightly")]
|
||||
{
|
||||
fn def(mut sender: mpsc::Sender<(ChatId, Sender<()>)>, val: (ChatId, Sender<()>)) -> Inner {
|
||||
async move { sender.send(val).await }
|
||||
}
|
||||
return ChanSend(def(self, val));
|
||||
}
|
||||
#[cfg(not(feature = "nightly"))]
|
||||
return ChanSend(Box::pin(async move { self.send(val).await }));
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for ChanSend {
|
||||
type Output = Result<(), SendError<(ChatId, Sender<()>)>>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
self.project().0.poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Add table
Reference in a new issue