[throttle] format, docs, modfix and small additions

This commit is contained in:
Waffle 2020-09-30 22:41:11 +03:00
parent 6714a9c503
commit b38e35b630
10 changed files with 141 additions and 100 deletions

View file

@ -35,7 +35,7 @@ thiserror = "1.0.20"
once_cell = "1.4.0"
# FIXME(waffle): use crates.io once published
vecrem = { git = "https://github.com/WaffleLapkin/vecrem" }
vecrem = { git = "https://github.com/WaffleLapkin/vecrem", rev = "6b9b6f42342df8b75548c6ed387072ff235429b1" }
[features]
# features those require nightly compiler

View file

@ -1716,7 +1716,7 @@ impl Requester for Bot {
fn send_message<C, T>(&self, chat_id: C, text: T) -> JsonRequest<payloads::SendMessage>
where
C: Into<ChatId>,
T: Into<String>
T: Into<String>,
{
Self::SendMessage::new(self.clone(), payloads::SendMessage::new(chat_id, text))
}

View file

@ -11,10 +11,8 @@ use once_cell::sync::OnceCell;
use crate::{
payloads::GetMe,
requests::{HasPayload, Request, Requester},
types::User,
types::{ChatId, User},
};
use crate::payloads::SendMessage;
use crate::types::ChatId;
/// `get_me` cache.
///
@ -74,7 +72,7 @@ impl<B: Requester> Requester for CacheMe<B> {
fn send_message<C, T>(&self, chat_id: C, text: T) -> Self::SendMessage
where
C: Into<ChatId>,
T: Into<String>
T: Into<String>,
{
self.bot.send_message(chat_id, text)
}

View file

@ -1,30 +1,35 @@
use crate::Bot;
use std::collections::{VecDeque, HashMap};
use std::time::Instant;
use std::sync::Arc;
use tokio::sync::{Mutex, mpsc};
use std::cmp::max;
use crate::requests::{Requester, Request, Output, HasPayload, Payload};
use crate::payloads::{GetMe, SendMessage};
use crate::types::ChatId;
use tokio::sync::oneshot::{Sender, Receiver, channel};
use std::future::Future;
use futures::task::{Context, Poll};
use pin_project::__private::Pin;
use core::time::Duration;
use futures::{TryFutureExt, StreamExt, FutureExt};
use tokio::time::{delay_until, delay_for};
use futures::stream::FuturesUnordered;
use futures::executor::block_on;
use futures::future::join3;
use futures::future::ready;
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
future::Future,
pin::Pin,
time::{Duration, Instant},
};
// FIXME: rename to Throttle
use futures::task::{Context, Poll};
use tokio::{
sync::{
mpsc,
oneshot::{channel, Receiver, Sender},
},
time::delay_for,
};
use vecrem::VecExt;
use crate::{
bot::limits::chan_send::{ChanSend, SendTy},
payloads::SendMessage,
requests::{HasPayload, Output, Request, Requester},
types::ChatId,
};
const MINUTE: Duration = Duration::from_secs(50); // FIXME: min = sec * 10 only in tests
const SECOND: Duration = Duration::from_secs(1);
const DELAY: Duration = Duration::from_millis(250); // second/4
/// Telegram request limits.
///
/// This struct is used in [`Throttle`]
#[derive(Copy, Clone, Eq, PartialEq, Hash)]
pub struct Limits {
/// Allowed messages in one chat per second
pub chat_s: u32,
@ -34,26 +39,21 @@ pub struct Limits {
pub chat_m: u32,
}
// https://core.telegram.org/bots/faq#my-bot-is-hitting-limits-how-do-i-avoid-this
/// Defaults are taken from [telegram documentation][tgdoc].
///
/// [tgdoc]: https://core.telegram.org/bots/faq#my-bot-is-hitting-limits-how-do-i-avoid-this
impl Default for Limits {
fn default() -> Self {
Self {
chat_s: 1,
overall_s: 30,
chat_m: 20,
}
Self { chat_s: 1, overall_s: 30, chat_m: 20 }
}
}
pub struct Limited<B> {
pub struct Throttle<B> {
bot: B,
queue: mpsc::Sender<(ChatId, Sender<()>)>,
}
async fn worker(
limits: Limits,
mut queue_rx: mpsc::Receiver<(ChatId, Sender<()>)>,
) {
async fn worker(limits: Limits, mut queue_rx: mpsc::Receiver<(ChatId, Sender<()>)>) {
// FIXME: use spawn_blocking?
// FIXME: remove unnecessary ChatId clones
@ -88,19 +88,26 @@ async fn worker(
// make history and hchats up-to-date
while let Some((_, time)) = history.front() {
// history is sorted, we found first up-to-date thing
if time >= &min_back { break; }
if time >= &min_back {
break;
}
if let Some((chat, _)) = history.pop_front() {
if let Entry::Occupied(entry) = hchats
.entry(chat)
.and_modify(|count| { *count -= 1; }) {
if *entry.get() == 0 { entry.remove_entry(); }
if let Entry::Occupied(entry) = hchats.entry(chat).and_modify(|count| {
*count -= 1;
}) {
if *entry.get() == 0 {
entry.remove_entry();
}
}
}
}
// as truncates which is ok since in case of truncation it would always be >= limits.overall_s
let mut allowed = limits.overall_s.saturating_sub(history.iter().take_while(|(_, time)| time > &sec_back).count() as u32);
// as truncates which is ok since in case of truncation it would always be >=
// limits.overall_s
let mut allowed = limits
.overall_s
.saturating_sub(history.iter().take_while(|(_, time)| time > &sec_back).count() as u32);
if allowed == 0 {
hchats_s.clear();
@ -109,23 +116,15 @@ async fn worker(
}
for (chat, _) in history.iter().take_while(|(_, time)| time > &sec_back) {
*hchats_s
.entry(chat.clone())
.or_insert(0) += 1;
*hchats_s.entry(chat.clone()).or_insert(0) += 1;
}
let mut queue_rem = queue.removing();
while let Some(entry) = queue_rem.next() {
let chat = &entry.value().0;
let cond = {
hchats_s
.get(chat)
.copied()
.unwrap_or(0) < limits.chat_s &&
hchats
.get(chat)
.copied()
.unwrap_or(0) < limits.chat_m
hchats_s.get(chat).copied().unwrap_or(0) < limits.chat_s
&& hchats.get(chat).copied().unwrap_or(0) < limits.chat_m
};
if cond {
@ -151,37 +150,70 @@ async fn worker(
}
}
impl<B> Limited<B> {
impl<B> Throttle<B> {
/// Creates new [`Throttle`] alongside with worker future.
///
/// Note: [`Throttle`] will only send requests if returned worker is
/// polled/spawned/awaited.
pub fn new(bot: B, limits: Limits) -> (Self, impl Future<Output = ()>) {
// FIXME: just a random number, currently
let (queue_tx, queue_rx) = mpsc::channel(130);
let worker = worker(
limits,
queue_rx,
);
let worker = worker(limits, queue_rx);
let this = Self { bot, queue: queue_tx, };
let this = Self { bot, queue: queue_tx };
(this, worker)
}
/// Creates new [`Throttle`] spawning the worker with `tokio::spawn`
///
/// Note: it's recommended to use [`RequesterExt::throttle`] instead.
pub fn new_spawn(bot: B, limits: Limits) -> Self
where
// Basically, I hate this bound.
// This is yet another problem caused by [rust-lang/#76882].
// And I think it *is* a bug.
//
// [rust-lang/#76882]: https://github.com/rust-lang/rust/issues/76882
//
// Though crucially I can't think of a case with non-static bot.
// But anyway, it doesn't change the fact that this bound is redundant.
//
// (waffle)
B: 'static,
{
let (this, worker) = Self::new(bot, limits);
tokio::spawn(worker);
this
}
/// Allows to access inner bot
pub fn inner(&self) -> &B {
&self.bot
}
/// Unwraps inner bot
pub fn into_inner(self) -> B {
self.bot
}
}
impl<B: Requester> Requester for Limited<B> {
impl<B: Requester> Requester for Throttle<B> {
type GetMe = B::GetMe;
fn get_me(&self) -> Self::GetMe {
self.bot.get_me()
}
type SendMessage = LimitedRequest<B::SendMessage>;
type SendMessage = ThrottlingRequest<B::SendMessage>;
fn send_message<C, T>(&self, chat_id: C, text: T) -> Self::SendMessage
where
C: Into<ChatId>,
T: Into<String>
T: Into<String>,
{
LimitedRequest(self.bot.send_message(chat_id, text), self.queue.clone())
ThrottlingRequest(self.bot.send_message(chat_id, text), self.queue.clone())
}
}
@ -196,9 +228,9 @@ impl GetChatId for SendMessage {
}
}
pub struct LimitedRequest<R>(R, mpsc::Sender<(ChatId, Sender<()>)>);
pub struct ThrottlingRequest<R>(R, mpsc::Sender<(ChatId, Sender<()>)>);
impl<R: HasPayload> HasPayload for LimitedRequest<R> {
impl<R: HasPayload> HasPayload for ThrottlingRequest<R> {
type Payload = R::Payload;
fn payload_mut(&mut self) -> &mut Self::Payload {
@ -210,7 +242,7 @@ impl<R: HasPayload> HasPayload for LimitedRequest<R> {
}
}
impl<R: Request> Request for LimitedRequest<R>
impl<R: Request> Request for ThrottlingRequest<R>
where
<R as HasPayload>::Payload: GetChatId,
{
@ -218,14 +250,10 @@ where
type Send = LimitedSend<R>;
type SendRef = LimitedSend<R>;
fn send(mut self) -> Self::Send {
fn send(self) -> Self::Send {
let (tx, rx) = channel();
let send = self.1.send_t((self.0.payload_ref().get_chat_id().clone(), tx));
LimitedSend::Registering {
request: self.0,
send,
wait: rx,
}
LimitedSend::Registering { request: self.0, send, wait: rx }
}
fn send_ref(&self) -> Self::SendRef {
@ -263,7 +291,9 @@ impl<R: Request> Future for LimitedSend<R> {
Poll::Ready(r) => {
// FIXME(waffle): remove unwrap
r.unwrap();
if let SendRepl::Registering { request, send: _, wait } = self.as_mut().project_replace(LimitedSend::Done) {
if let SendRepl::Registering { request, send: _, wait } =
self.as_mut().project_replace(LimitedSend::Done)
{
self.as_mut().project_replace(LimitedSend::Pending { request, wait });
}
@ -275,7 +305,9 @@ impl<R: Request> Future for LimitedSend<R> {
Poll::Ready(r) => {
// FIXME(waffle): remove unwrap
r.unwrap();
if let SendRepl::Pending { request, wait: _ } = self.as_mut().project_replace(LimitedSend::Done) {
if let SendRepl::Pending { request, wait: _ } =
self.as_mut().project_replace(LimitedSend::Done)
{
self.as_mut().project_replace(LimitedSend::Sent { fut: request.send() });
}
@ -292,27 +324,19 @@ impl<R: Request> Future for LimitedSend<R> {
}
}
use chan_send::{ChanSend, SendTy as _};
use crate::bot::limits::chan_send::SendTy;
use std::collections::hash_map::Entry;
use core::mem;
use vecrem::VecExt;
mod chan_send {
use tokio::sync::mpsc;
use crate::types::ChatId;
use tokio::sync::oneshot::Sender;
use std::future::Future;
use futures::task::{Context, Poll};
use pin_project::__private::Pin;
use tokio::sync::mpsc::error::SendError;
use std::future::Future;
use tokio::sync::{mpsc, mpsc::error::SendError, oneshot::Sender};
pub(crate) trait SendTy {
fn send_t(self, val: (ChatId, Sender<()>)) -> ChanSend;
}
#[pin_project::pin_project]
pub/*(crate) */struct ChanSend(#[pin] Inner); // FIXME
pub struct ChanSend(#[pin] Inner); // FIXME
#[cfg(not(feature = "nightly"))]
type Inner = Pin<Box<dyn Future<Output = Result<(), SendError<(ChatId, Sender<()>)>>>>>;
@ -323,7 +347,10 @@ mod chan_send {
fn send_t(mut self, val: (ChatId, Sender<()>)) -> ChanSend {
#[cfg(feature = "nightly")]
{
fn def(mut sender: mpsc::Sender<(ChatId, Sender<()>)>, val: (ChatId, Sender<()>)) -> Inner {
fn def(
mut sender: mpsc::Sender<(ChatId, Sender<()>)>,
val: (ChatId, Sender<()>),
) -> Inner {
async move { sender.send(val).await }
}
return ChanSend(def(self, val));
@ -340,5 +367,4 @@ mod chan_send {
self.project().0.poll(cx)
}
}
}
}

View file

@ -21,6 +21,7 @@ mod limits;
pub use auto_send::AutoSend;
pub use cache_me::CacheMe;
pub use limits::{Limits, Throttle};
pub(crate) const TELOXIDE_TOKEN: &str = "TELOXIDE_TOKEN";
pub(crate) const TELOXIDE_PROXY: &str = "TELOXIDE_PROXY";

View file

@ -27,8 +27,11 @@ pub mod prelude;
pub mod requests;
pub mod types;
// FIXME(waffle): made `pub` to reexport bot wrappers, in future we may want to
// reexport them from elsewhere
pub mod bot;
// reexported
mod bot;
mod errors;
// implementation details

View file

@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
use crate::{
requests::{HasPayload, Payload},
types::{ChatId, Message, ParseMode, ReplyMarkup}
types::{ChatId, Message, ParseMode, ReplyMarkup},
};
/// Use this method to send text messages.

View file

@ -1,2 +1 @@
pub use crate::payloads::GetMeSetters as _;
pub use crate::payloads::SendMessageSetters as _;
pub use crate::payloads::{GetMeSetters as _, SendMessageSetters as _};

View file

@ -1,8 +1,5 @@
use crate::{
payloads::{
GetMe,
SendMessage
},
payloads::{GetMe, SendMessage},
requests::Request,
types::ChatId,
};

View file

@ -1,4 +1,8 @@
use crate::{requests::Requester, AutoSend, CacheMe};
use crate::{
bot::{CacheMe, Limits, Throttle},
requests::Requester,
AutoSend,
};
pub trait RequesterExt: Requester {
/// Add `get_me` caching ability, see [`CacheMe`] for more.
@ -16,6 +20,19 @@ pub trait RequesterExt: Requester {
{
AutoSend::new(self)
}
/// Add throttling ability, see [`Throttle`] for more.
///
/// Note: this spawns the worker, just as [`Throttle::new_spawn`].
fn throttle(self, limits: Limits) -> Throttle<Self>
where
Self: Sized,
// >:(
// (waffle)
Self: 'static,
{
Throttle::new_spawn(self, limits)
}
}
impl<T> RequesterExt for T