Merge pull request #77 from teloxide/adaptor_info

Adaptor info
This commit is contained in:
Hirrolot 2021-06-30 23:55:32 +06:00 committed by GitHub
commit bd104a0a08
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 278 additions and 137 deletions

View file

@ -10,12 +10,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- `impl Clone` for {`CacheMe`, `DefaultParseMode`, `Throttle`} ([#76][pr76])
- `DefaultParseMode::parse_mode` which allows to get currently used default parse mode ([#77][pr77])
- `Thrrotle::{limits,set_limits,with_queue_full_fn,spawn_with_queue_full_fn}` functions ([#77][pr77])
- Getters for fields nested in `Chat` ([#80][pr80])
- API errors: `ApiError::NotEnoughRightsToManagePins`, `ApiError::BotKickedFromSupergroup` ([#84][pr84])
- Telegram bot API 5.2 support ([#86][pr86])
- `net::default_reqwest_settings` function ([#90][pr90])
[pr75]: https://github.com/teloxide/teloxide-core/pull/75
[pr77]: https://github.com/teloxide/teloxide-core/pull/77
[pr76]: https://github.com/teloxide/teloxide-core/pull/76
[pr80]: https://github.com/teloxide/teloxide-core/pull/80
[pr84]: https://github.com/teloxide/teloxide-core/pull/84
@ -26,7 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `Message::url` now returns links to messages in private groups too ([#80][pr80])
- Refactor `ChatMember` methods ([#74][pr74])
- impl `Deref<Target = ChatMemberKind>` to make `ChatMemberKind`'s methods callible directly on `ChatMember`
- impl `Deref<Target = ChatMemberKind>` to make `ChatMemberKind`'s methods callable directly on `ChatMember`
- Add `ChatMemberKind::is_{creator,administrator,member,restricted,left,kicked}` which check `kind` along with `is_privileged` and `is_in_chat` which combine some of the above.
- Refactor privilege getters
- Rename `ChatAction::{RecordAudio => RecordVoice, UploadAudio => UploadVoice}` ([#86][pr86])

View file

@ -34,6 +34,11 @@ impl<B> DefaultParseMode<B> {
pub fn into_inner(self) -> B {
self.bot
}
/// Returns currently used [`ParseMode`].
pub fn parse_mode(&self) -> ParseMode {
self.mode
}
}
macro_rules! f {

View file

@ -23,91 +23,6 @@ use crate::{
types::*,
};
// Throttling is quite complicated. This comment describes the algorithm of the
// current implementation.
//
// ### Request
//
// When a throttling request is sent, it sends a tuple of `ChatId` and
// `Sender<()>` to the worker. Then the request waits for a notification from
// the worker. When notification is received, it sends the underlying request.
//
// ### Worker
//
// The worker does the most important job -- it ensures that the limits are
// never exceeded.
//
// The worker stores a history of requests sent in the last minute (and to which
// chats they were sent) and a queue of pending updates.
//
// The worker does the following algorithm loop:
//
// 1. If the queue is empty, wait for the first message in incoming channel (and
// add it to the queue).
//
// 2. Read all present messages from an incoming channel and transfer them to
// the queue.
//
// 3. Record the current time.
//
// 4. Clear the history from records whose time < (current time - minute).
//
// 5. Count all requests which were sent last second, `allowed =
// limit.messages_per_sec_overall - count`.
//
// 6. If `allowed == 0` wait a bit and `continue` to the next iteration.
//
// 7. Count how many requests were sent to which chats (i.e.: create
// `Map<ChatId, Count>`). (Note: the same map, but for last minute also exists,
// but it's updated, instead of recreation.)
//
// 8. While `allowed >= 0` search for requests which chat haven't exceed the
// limits (i.e.: map[chat] < limit), if one is found, decrease `allowed`, notify
// the request that it can be now executed, increase counts, add record to the
// history.
const MINUTE: Duration = Duration::from_secs(60);
const SECOND: Duration = Duration::from_secs(1);
// Delay between worker iterations.
//
// For now it's `second/4`, but that number is chosen pretty randomly, we may
// want to change this.
const DELAY: Duration = Duration::from_millis(250);
/// Telegram request limits.
///
/// This struct is used in [`Throttle`].
///
/// Note that you may ask telegram [@BotSupport] to increase limits for your
/// particular bot if it has a lot of users (but they may or may not do that).
///
/// [@BotSupport]: https://t.me/botsupport
#[derive(Copy, Clone, Eq, PartialEq, Hash)]
pub struct Limits {
/// Allowed messages in one chat per second.
pub messages_per_sec_chat: u32,
/// Allowed messages in one chat per minute.
pub messages_per_min_chat: u32,
/// Allowed messages per second.
pub messages_per_sec_overall: u32,
}
/// Defaults are taken from [telegram documentation][tgdoc].
///
/// [tgdoc]: https://core.telegram.org/bots/faq#my-bot-is-hitting-limits-how-do-i-avoid-this
impl Default for Limits {
fn default() -> Self {
Self {
messages_per_sec_chat: 1,
messages_per_sec_overall: 30,
messages_per_min_chat: 20,
}
}
}
/// Automatic request limits respecting mechanism.
///
/// Telegram has strict [limits], which, if exceeded will sooner or later cause
@ -151,6 +66,188 @@ pub struct Throttle<B> {
bot: B,
// `RequestLock` allows to unlock requests (allowing them to be sent).
queue: mpsc::Sender<(ChatIdHash, RequestLock)>,
info_tx: mpsc::Sender<InfoMessage>,
}
impl<B> Throttle<B> {
/// Creates new [`Throttle`] alongside with worker future.
///
/// Note: [`Throttle`] will only send requests if returned worker is
/// polled/spawned/awaited.
pub fn new(bot: B, limits: Limits) -> (Self, impl Future<Output = ()>) {
Self::with_queue_full_fn(bot, limits, |pending| async move {
log::warn!("Throttle queue is full ({} pending requests)", pending);
})
}
/// Creates new [`Throttle`] alongside with worker future.
///
/// `queue_full` function is called when internal queue is full.
///
/// Note: [`Throttle`] will only send requests if returned worker is
/// polled/spawned/awaited.
pub fn with_queue_full_fn<F, Fut>(
bot: B,
limits: Limits,
queue_full: F,
) -> (Self, impl Future<Output = ()>)
where
F: Fn(usize) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
let (tx, rx) = mpsc::channel(limits.messages_per_sec_overall as usize);
let (info_tx, info_rx) = mpsc::channel(2);
let worker = worker(limits, rx, info_rx, queue_full);
let this = Self {
bot,
queue: tx,
info_tx,
};
(this, worker)
}
/// Creates new [`Throttle`] spawning the worker with `tokio::spawn`
///
/// Note: it's recommended to use [`RequesterExt::throttle`] instead.
///
/// [`RequesterExt::throttle`]: crate::requests::RequesterExt::throttle
pub fn new_spawn(bot: B, limits: Limits) -> Self {
// new/with_queue_full_fn copypasted here to avoid [rust-lang/#76882]
//
// [rust-lang/#76882]: https://github.com/rust-lang/rust/issues/76882
let (tx, rx) = mpsc::channel(limits.messages_per_sec_overall as usize);
let (info_tx, info_rx) = mpsc::channel(2);
let worker = worker(limits, rx, info_rx, |pending| async move {
log::warn!("Throttle queue is full ({} pending requests)", pending);
});
let this = Self {
bot,
queue: tx,
info_tx,
};
tokio::spawn(worker);
this
}
/// Creates new [`Throttle`] spawning the worker with `tokio::spawn`
///
/// `queue_full` function is called when internal queue is full.
pub fn spawn_with_queue_full_fn<F, Fut>(bot: B, limits: Limits, queue_full: F) -> Self
where
F: Fn(usize) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
// with_queue_full_fn copypasted here to avoid [rust-lang/#76882]
//
// [rust-lang/#76882]: https://github.com/rust-lang/rust/issues/76882
let (tx, rx) = mpsc::channel(limits.messages_per_sec_overall as usize);
let (info_tx, info_rx) = mpsc::channel(2);
let worker = worker(limits, rx, info_rx, queue_full);
let this = Self {
bot,
queue: tx,
info_tx,
};
tokio::spawn(worker);
this
}
/// Allows to access inner bot
pub fn inner(&self) -> &B {
&self.bot
}
/// Unwraps inner bot
pub fn into_inner(self) -> B {
self.bot
}
/// Returns currently used [`Limits`].
pub async fn limits(&self) -> Limits {
let (tx, rx) = oneshot::channel();
self.info_tx
.send(InfoMessage::GetLimits { response: tx })
.await
.expect(WORKER_DIED);
rx.await.expect(WORKER_DIED)
}
/// Sets new limits.
///
/// Note: changes may not be applied imidiately.
pub async fn set_limits(&self, new: Limits) {
let (tx, rx) = oneshot::channel();
self.info_tx
.send(InfoMessage::SetLimits { new, response: tx })
.await
.ok();
rx.await.ok();
}
}
const WORKER_DIED: &str = "worker died before last `Throttle` instance";
/// Telegram request limits.
///
/// This struct is used in [`Throttle`].
///
/// Note that you may ask telegram [@BotSupport] to increase limits for your
/// particular bot if it has a lot of users (but they may or may not do that).
///
/// [@BotSupport]: https://t.me/botsupport
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct Limits {
/// Allowed messages in one chat per second.
pub messages_per_sec_chat: u32,
/// Allowed messages in one chat per minute.
pub messages_per_min_chat: u32,
/// Allowed messages per second.
pub messages_per_sec_overall: u32,
}
/// Defaults are taken from [telegram documentation][tgdoc].
///
/// [tgdoc]: https://core.telegram.org/bots/faq#my-bot-is-hitting-limits-how-do-i-avoid-this
impl Default for Limits {
fn default() -> Self {
Self {
messages_per_sec_chat: 1,
messages_per_sec_overall: 30,
messages_per_min_chat: 20,
}
}
}
const MINUTE: Duration = Duration::from_secs(60);
const SECOND: Duration = Duration::from_secs(1);
// Delay between worker iterations.
//
// For now it's `second/4`, but that number is chosen pretty randomly, we may
// want to change this.
const DELAY: Duration = Duration::from_millis(250);
/// Minimal time beetween calls to queue_full function
const QUEUE_FULL_DELAY: Duration = Duration::from_secs(4);
#[derive(Debug)]
enum InfoMessage {
GetLimits { response: Sender<Limits> },
SetLimits { new: Limits, response: Sender<()> },
}
type RequestsSent = u32;
@ -164,7 +261,58 @@ struct RequestsSentToChats {
per_sec: HashMap<ChatIdHash, RequestsSent>,
}
async fn worker(limits: Limits, mut rx: mpsc::Receiver<(ChatIdHash, RequestLock)>) {
// Throttling is quite complicated. This comment describes the algorithm of the
// current implementation.
//
// ### Request
//
// When a throttling request is sent, it sends a tuple of `ChatId` and
// `Sender<()>` to the worker. Then the request waits for a notification from
// the worker. When notification is received, it sends the underlying request.
//
// ### Worker
//
// The worker does the most important job -- it ensures that the limits are
// never exceeded.
//
// The worker stores a history of requests sent in the last minute (and to which
// chats they were sent) and a queue of pending updates.
//
// The worker does the following algorithm loop:
//
// 1. If the queue is empty, wait for the first message in incoming channel (and
// add it to the queue).
//
// 2. Read all present messages from an incoming channel and transfer them to
// the queue.
//
// 3. Record the current time.
//
// 4. Clear the history from records whose time < (current time - minute).
//
// 5. Count all requests which were sent last second, `allowed =
// limit.messages_per_sec_overall - count`.
//
// 6. If `allowed == 0` wait a bit and `continue` to the next iteration.
//
// 7. Count how many requests were sent to which chats (i.e.: create
// `Map<ChatId, Count>`). (Note: the same map, but for last minute also exists,
// but it's updated, instead of recreation.)
//
// 8. While `allowed >= 0` search for requests which chat haven't exceed the
// limits (i.e.: map[chat] < limit), if one is found, decrease `allowed`, notify
// the request that it can be now executed, increase counts, add record to the
// history.
async fn worker<F, Fut>(
mut limits: Limits,
mut rx: mpsc::Receiver<(ChatIdHash, RequestLock)>,
mut info_rx: mpsc::Receiver<InfoMessage>,
queue_full: F,
) where
F: Fn(usize) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
// FIXME(waffle): Make an research about data structures for this queue.
// Currently this is O(n) removing (n = number of elements
// stayed), amortized O(1) push (vec+vecrem).
@ -176,9 +324,28 @@ async fn worker(limits: Limits, mut rx: mpsc::Receiver<(ChatIdHash, RequestLock)
let mut rx_is_closed = false;
let mut last_queue_full = Instant::now()
.checked_sub(QUEUE_FULL_DELAY)
.unwrap_or_else(Instant::now);
while !rx_is_closed || !queue.is_empty() {
// FIXME(waffle):
// 1. If the `queue` is empty, `read_from_rx` call down below will 'block'
// execution untill a request is sent. While the execution is 'blocked' no
// `InfoMessage`s could be answered.
//
// 2. If limits are descreased, ideally we want to shrink queue.
//
// *blocked in asyncronous way
answer_info(&mut info_rx, &mut limits);
read_from_rx(&mut rx, &mut queue, &mut rx_is_closed).await;
debug_assert_eq!(queue.capacity(), limits.messages_per_sec_overall as usize);
//debug_assert_eq!(queue.capacity(), limits.messages_per_sec_overall as usize);
if queue.len() == queue.capacity() && last_queue_full.elapsed() > QUEUE_FULL_DELAY {
last_queue_full = Instant::now();
tokio::spawn(queue_full(queue.len()));
}
// _Maybe_ we need to use `spawn_blocking` here, because there is
// decent amount of blocking work. However _for now_ I've decided not
@ -287,6 +454,21 @@ async fn worker(limits: Limits, mut rx: mpsc::Receiver<(ChatIdHash, RequestLock)
}
}
fn answer_info(rx: &mut mpsc::Receiver<InfoMessage>, limits: &mut Limits) {
// FIXME(waffle): https://github.com/tokio-rs/tokio/issues/3350
while let Some(Some(req)) = rx.recv().now_or_never() {
// Errors are ignored with .ok(). Error means that the response channel
// is closed and the response isn't needed.
match req {
InfoMessage::GetLimits { response } => response.send(*limits).ok(),
InfoMessage::SetLimits { new, response } => {
*limits = new;
response.send(()).ok()
}
};
}
}
async fn read_from_rx<T>(rx: &mut mpsc::Receiver<T>, queue: &mut Vec<T>, rx_is_closed: &mut bool) {
if queue.is_empty() {
match rx.recv().await {
@ -307,55 +489,6 @@ async fn read_from_rx<T>(rx: &mut mpsc::Receiver<T>, queue: &mut Vec<T>, rx_is_c
}
}
impl<B> Throttle<B> {
/// Creates new [`Throttle`] alongside with worker future.
///
/// Note: [`Throttle`] will only send requests if returned worker is
/// polled/spawned/awaited.
pub fn new(bot: B, limits: Limits) -> (Self, impl Future<Output = ()>) {
let (tx, rx) = mpsc::channel(limits.messages_per_sec_overall as usize);
let worker = worker(limits, rx);
let this = Self { bot, queue: tx };
(this, worker)
}
/// Creates new [`Throttle`] spawning the worker with `tokio::spawn`
///
/// Note: it's recommended to use [`RequesterExt::throttle`] instead.
///
/// [`RequesterExt::throttle`]: crate::requests::RequesterExt::throttle
pub fn new_spawn(bot: B, limits: Limits) -> Self
where
// Basically, I hate this bound.
// This is yet another problem caused by [rust-lang/#76882].
// And I think it *is* a bug.
//
// [rust-lang/#76882]: https://github.com/rust-lang/rust/issues/76882
//
// Though crucially I can't think of a case with non-static bot.
// But anyway, it doesn't change the fact that this bound is redundant.
//
// (waffle)
B: 'static,
{
let (this, worker) = Self::new(bot, limits);
tokio::spawn(worker);
this
}
/// Allows to access inner bot
pub fn inner(&self) -> &B {
&self.bot
}
/// Unwraps inner bot
pub fn into_inner(self) -> B {
self.bot
}
}
macro_rules! f {
($m:ident $this:ident ($($arg:ident : $T:ty),*)) => {
ThrottlingRequest {