[throttle] hash ChatId::ChannelUsername to prevent expensive string cloning & make send{,_ref} enums private

The future has come!
This commit is contained in:
Waffle 2020-10-02 13:28:46 +03:00
parent 7fdf9cd9a6
commit 6f03ca0954

View file

@ -21,6 +21,7 @@ use crate::{
requests::{HasPayload, Output, Request, Requester},
types::ChatId,
};
use std::hash::{Hash, Hasher};
const MINUTE: Duration = Duration::from_secs(50); // FIXME: min = sec * 10 only in tests
const SECOND: Duration = Duration::from_secs(1);
@ -60,30 +61,27 @@ impl Default for Limits {
/// Our current approach is to just give up and check `chat_id_a == chat_id_b`.
/// This may give incorrect results.
///
/// Also, current algorithm requires to `clone` `chat_id` several times, which
/// can be quire expensive for strings (though this may be fixed in the future)
///
/// As such, we encourage not to use `ChatId::ChannelUsername(u)` with this bot
/// wrapper.
pub struct Throttle<B> {
bot: B,
queue: mpsc::Sender<(ChatId, Sender<()>)>,
queue: mpsc::Sender<(Id, Sender<()>)>,
}
async fn worker(limits: Limits, mut queue_rx: mpsc::Receiver<(ChatId, Sender<()>)>) {
async fn worker(limits: Limits, mut queue_rx: mpsc::Receiver<(Id, Sender<()>)>) {
// FIXME: remove unnecessary ChatId clones
// FIXME(waffle): Make an research about data structures for this queue.
// Currently this is O(n) removing (n = number of elements
// stayed), amortized O(1) push (vec+vecrem).
let mut queue: Vec<(ChatId, Sender<()>)> = Vec::new(); // FIXME: with_cap
let mut queue: Vec<(Id, Sender<()>)> = Vec::new(); // FIXME: with_cap
// I wish there was special data structure for history which removed the
// need in 2 hashmaps
// (waffle)
let mut history: VecDeque<(ChatId, Instant)> = VecDeque::new();
let mut history: VecDeque<(Id, Instant)> = VecDeque::new();
// hchats[chat] = history.iter().filter(|(c, _)| c == chat).count()
let mut hchats: HashMap<ChatId, u32> = HashMap::new();
let mut hchats: HashMap<Id, u32> = HashMap::new();
let mut hchats_s = HashMap::new();
loop {
@ -285,6 +283,30 @@ impl<B: Requester> Requester for Throttle<B> {
}
}
/// Id used in worker.
///
/// It is used instead of `ChatId` to make copying cheap even in case of
/// usernames. (It just hashes username)
#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)]
enum Id {
Id(i64),
Ch(u64),
}
impl From<&ChatId> for Id {
fn from(value: &ChatId) -> Self {
match value {
ChatId::Id(id) => Id::Id(*id),
ChatId::ChannelUsername(username) => {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
username.hash(&mut hasher);
let hash = hasher.finish();
Id::Ch(hash)
}
}
}
}
pub trait GetChatId {
fn get_chat_id(&self) -> &ChatId;
}
@ -295,7 +317,7 @@ impl GetChatId for SendMessage {
}
}
pub struct ThrottlingRequest<R>(R, mpsc::Sender<(ChatId, Sender<()>)>);
pub struct ThrottlingRequest<R>(R, mpsc::Sender<(Id, Sender<()>)>);
impl<R: HasPayload> HasPayload for ThrottlingRequest<R> {
type Payload = R::Payload;
@ -319,13 +341,13 @@ where
fn send(self) -> Self::Send {
let (tx, rx) = channel();
let send = self.1.send_t((self.0.payload_ref().get_chat_id().clone(), tx));
ThrottlingSend::Registering { request: self.0, send, wait: rx }
let send = self.1.send_t((self.0.payload_ref().get_chat_id().into(), tx));
ThrottlingSend(ThrottlingSendInner::Registering { request: self.0, send, wait: rx })
}
fn send_ref(&self) -> Self::SendRef {
let (tx, rx) = channel();
let send = self.1.clone().send_t((self.0.payload_ref().get_chat_id().clone(), tx));
let send = self.1.clone().send_t((self.0.payload_ref().get_chat_id().into(), tx));
// As we can't move self.0 (request) out, as we do in `send` we are
// forced to call `send_ref()`. This may have overhead and/or lead to
@ -335,12 +357,15 @@ where
// should **not** do any kind of work, so it's ok.
let request = self.0.send_ref();
ThrottlingSendRef::Registering { request, send, wait: rx }
ThrottlingSendRef(ThrottlingSendRefInner::Registering { request, send, wait: rx })
}
}
#[pin_project::pin_project]
pub struct ThrottlingSend<R: Request>(#[pin] ThrottlingSendInner<R>);
#[pin_project::pin_project(project = SendProj, project_replace = SendRepl)]
pub enum ThrottlingSend<R: Request> {
enum ThrottlingSendInner<R: Request> {
Registering {
request: R,
#[pin]
@ -363,16 +388,18 @@ impl<R: Request> Future for ThrottlingSend<R> {
type Output = Result<Output<R>, R::Err>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().project() {
let mut this = self.as_mut().project().0;
match this.as_mut().project() {
SendProj::Registering { request: _, send, wait: _ } => match send.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(r) => {
// FIXME(waffle): remove unwrap
r.unwrap();
if let SendRepl::Registering { request, send: _, wait } =
self.as_mut().project_replace(ThrottlingSend::Done)
this.as_mut().project_replace(ThrottlingSendInner::Done)
{
self.as_mut().project_replace(ThrottlingSend::Pending { request, wait });
this.as_mut().project_replace(ThrottlingSendInner::Pending { request, wait });
}
self.poll(cx)
@ -384,9 +411,9 @@ impl<R: Request> Future for ThrottlingSend<R> {
// FIXME(waffle): remove unwrap
r.unwrap();
if let SendRepl::Pending { request, wait: _ } =
self.as_mut().project_replace(ThrottlingSend::Done)
this.as_mut().project_replace(ThrottlingSendInner::Done)
{
self.as_mut().project_replace(ThrottlingSend::Sent { fut: request.send() });
this.as_mut().project_replace(ThrottlingSendInner::Sent { fut: request.send() });
}
self.poll(cx)
@ -394,7 +421,7 @@ impl<R: Request> Future for ThrottlingSend<R> {
},
SendProj::Sent { fut } => {
let res = futures::ready!(fut.poll(cx));
self.set(ThrottlingSend::Done);
this.set(ThrottlingSendInner::Done);
Poll::Ready(res)
}
SendProj::Done => Poll::Pending,
@ -402,8 +429,11 @@ impl<R: Request> Future for ThrottlingSend<R> {
}
}
#[pin_project::pin_project]
pub struct ThrottlingSendRef<R: Request>(#[pin] ThrottlingSendRefInner<R>);
#[pin_project::pin_project(project = SendRefProj, project_replace = SendRefRepl)]
pub enum ThrottlingSendRef<R: Request> {
enum ThrottlingSendRefInner<R: Request> {
Registering {
request: R::SendRef,
#[pin]
@ -426,16 +456,18 @@ impl<R: Request> Future for ThrottlingSendRef<R> {
type Output = Result<Output<R>, R::Err>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().project() {
let mut this = self.as_mut().project().0;
match this.as_mut().project() {
SendRefProj::Registering { request: _, send, wait: _ } => match send.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(r) => {
// FIXME(waffle): remove unwrap
r.unwrap();
if let SendRefRepl::Registering { request, send: _, wait } =
self.as_mut().project_replace(ThrottlingSendRef::Done)
this.as_mut().project_replace(ThrottlingSendRefInner::Done)
{
self.as_mut().project_replace(ThrottlingSendRef::Pending { request, wait });
this.as_mut().project_replace(ThrottlingSendRefInner::Pending { request, wait });
}
self.poll(cx)
@ -447,9 +479,9 @@ impl<R: Request> Future for ThrottlingSendRef<R> {
// FIXME(waffle): remove unwrap
r.unwrap();
if let SendRefRepl::Pending { request, wait: _ } =
self.as_mut().project_replace(ThrottlingSendRef::Done)
this.as_mut().project_replace(ThrottlingSendRefInner::Done)
{
self.as_mut().project_replace(ThrottlingSendRef::Sent { fut: request });
this.as_mut().project_replace(ThrottlingSendRefInner::Sent { fut: request });
}
self.poll(cx)
@ -457,7 +489,7 @@ impl<R: Request> Future for ThrottlingSendRef<R> {
},
SendRefProj::Sent { fut } => {
let res = futures::ready!(fut.poll(cx));
self.set(ThrottlingSendRef::Done);
this.set(ThrottlingSendRefInner::Done);
Poll::Ready(res)
}
SendRefProj::Done => Poll::Pending,
@ -472,27 +504,30 @@ mod chan_send {
use futures::task::{Context, Poll};
use tokio::sync::{mpsc, mpsc::error::SendError, oneshot::Sender};
use crate::types::ChatId;
use crate::{
types::ChatId,
bot::limits::Id
};
pub(crate) trait SendTy {
fn send_t(self, val: (ChatId, Sender<()>)) -> ChanSend;
pub(super) trait SendTy {
fn send_t(self, val: (Id, Sender<()>)) -> ChanSend;
}
#[pin_project::pin_project]
pub struct ChanSend(#[pin] Inner); // FIXME
pub(super) struct ChanSend(#[pin] Inner); // FIXME
#[cfg(not(feature = "nightly"))]
type Inner = Pin<Box<dyn Future<Output = Result<(), SendError<(ChatId, Sender<()>)>>>>>;
type Inner = Pin<Box<dyn Future<Output = Result<(), SendError<(Id, Sender<()>)>>>>>;
#[cfg(feature = "nightly")]
type Inner = impl Future<Output = Result<(), SendError<(ChatId, Sender<()>)>>>;
type Inner = impl Future<Output = Result<(), SendError<(Id, Sender<()>)>>>;
impl SendTy for mpsc::Sender<(ChatId, Sender<()>)> {
fn send_t(mut self, val: (ChatId, Sender<()>)) -> ChanSend {
impl SendTy for mpsc::Sender<(Id, Sender<()>)> {
fn send_t(mut self, val: (Id, Sender<()>)) -> ChanSend {
#[cfg(feature = "nightly")]
{
fn def(
mut sender: mpsc::Sender<(ChatId, Sender<()>)>,
val: (ChatId, Sender<()>),
mut sender: mpsc::Sender<(Id, Sender<()>)>,
val: (Id, Sender<()>),
) -> Inner {
async move { sender.send(val).await }
}
@ -504,7 +539,7 @@ mod chan_send {
}
impl Future for ChanSend {
type Output = Result<(), SendError<(ChatId, Sender<()>)>>;
type Output = Result<(), SendError<(Id, Sender<()>)>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().0.poll(cx)