mirror of
https://github.com/teloxide/teloxide.git
synced 2024-12-22 14:35:36 +01:00
Added stack size variability on dispatch
This commit is contained in:
parent
435257244c
commit
f8be5ccd8c
3 changed files with 74 additions and 28 deletions
|
@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
- `bot.forward`, `bot.edit_live_location`, `bot.stop_live_location`, `bot.set_reaction`, `bot.pin`, `bot.unpin`, `bot.edit_text`, `bot.edit_caption`, `bot.edit_media`, `bot.edit_reply_markup`, `bot.stop_poll_message`, `bot.delete` and `bot.copy` methods to the new `crate::sugar::bot::BotMessagesExt` trait
|
- `bot.forward`, `bot.edit_live_location`, `bot.stop_live_location`, `bot.set_reaction`, `bot.pin`, `bot.unpin`, `bot.edit_text`, `bot.edit_caption`, `bot.edit_media`, `bot.edit_reply_markup`, `bot.stop_poll_message`, `bot.delete` and `bot.copy` methods to the new `crate::sugar::bot::BotMessagesExt` trait
|
||||||
- `req.reply_to` method to the new `crate::sugar::request::RequestReplyExt` trait
|
- `req.reply_to` method to the new `crate::sugar::request::RequestReplyExt` trait
|
||||||
- `req.disable_link_preview` method to the new `crate::sugar::request::RequestLinkPreviewExt` trait
|
- `req.disable_link_preview` method to the new `crate::sugar::request::RequestLinkPreviewExt` trait
|
||||||
|
- `stack_size` setter to `DispatcherBuilder`
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
|
|
||||||
|
@ -28,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
- Now Vec<MessageId> in requests serializes into [number] instead of [ {message_id: number} ], `forward_messages`, `copy_messages` and `delete_messages` now work properly
|
- Now Vec<MessageId> in requests serializes into [number] instead of [ {message_id: number} ], `forward_messages`, `copy_messages` and `delete_messages` now work properly
|
||||||
- Now `InlineQueryResultsButton` serializes properly ([issue 1181](https://github.com/teloxide/teloxide/issues/1181))
|
- Now `InlineQueryResultsButton` serializes properly ([issue 1181](https://github.com/teloxide/teloxide/issues/1181))
|
||||||
- Now `ThreadId` is able to serialize in multipart requests ([PR 1179](https://github.com/teloxide/teloxide/pull/1179))
|
- Now `ThreadId` is able to serialize in multipart requests ([PR 1179](https://github.com/teloxide/teloxide/pull/1179))
|
||||||
|
- Now stack does not overflow on dispatch ([issue 1154](https://github.com/teloxide/teloxide/issues/1154))
|
||||||
|
|
||||||
## 0.13.0 - 2024-08-16
|
## 0.13.0 - 2024-08-16
|
||||||
|
|
||||||
|
|
|
@ -89,7 +89,7 @@ dptree = "0.3.0"
|
||||||
# Uncomment this if you want to test teloxide with a specific dptree commit
|
# Uncomment this if you want to test teloxide with a specific dptree commit
|
||||||
# dptree = { git = "https://github.com/teloxide/dptree", rev = "df578e4" }
|
# dptree = { git = "https://github.com/teloxide/dptree", rev = "df578e4" }
|
||||||
|
|
||||||
tokio = { version = "1.39", features = ["fs"] }
|
tokio = { version = "1.39", features = ["fs", "rt-multi-thread"] }
|
||||||
tokio-util = "0.7"
|
tokio-util = "0.7"
|
||||||
tokio-stream = "0.1.8"
|
tokio-stream = "0.1.8"
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ use crate::{
|
||||||
},
|
},
|
||||||
error_handlers::{ErrorHandler, LoggingErrorHandler},
|
error_handlers::{ErrorHandler, LoggingErrorHandler},
|
||||||
requests::{Request, Requester},
|
requests::{Request, Requester},
|
||||||
|
stop::StopToken,
|
||||||
types::{Update, UpdateKind},
|
types::{Update, UpdateKind},
|
||||||
update_listeners::{self, UpdateListener},
|
update_listeners::{self, UpdateListener},
|
||||||
};
|
};
|
||||||
|
@ -16,6 +17,7 @@ use futures::{
|
||||||
stream::FuturesUnordered,
|
stream::FuturesUnordered,
|
||||||
FutureExt as _, StreamExt as _,
|
FutureExt as _, StreamExt as _,
|
||||||
};
|
};
|
||||||
|
use tokio::runtime::Builder;
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
|
@ -29,6 +31,7 @@ use std::{
|
||||||
atomic::{AtomicBool, AtomicU32, Ordering},
|
atomic::{AtomicBool, AtomicU32, Ordering},
|
||||||
Arc,
|
Arc,
|
||||||
},
|
},
|
||||||
|
thread,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// The builder for [`Dispatcher`].
|
/// The builder for [`Dispatcher`].
|
||||||
|
@ -44,6 +47,7 @@ pub struct DispatcherBuilder<R, Err, Key> {
|
||||||
ctrlc_handler: bool,
|
ctrlc_handler: bool,
|
||||||
distribution_f: fn(&Update) -> Option<Key>,
|
distribution_f: fn(&Update) -> Option<Key>,
|
||||||
worker_queue_size: usize,
|
worker_queue_size: usize,
|
||||||
|
stack_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R, Err, Key> DispatcherBuilder<R, Err, Key>
|
impl<R, Err, Key> DispatcherBuilder<R, Err, Key>
|
||||||
|
@ -104,6 +108,14 @@ where
|
||||||
Self { worker_queue_size: size, ..self }
|
Self { worker_queue_size: size, ..self }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Specifies the stack size of the dispatcher.
|
||||||
|
///
|
||||||
|
/// By default it's 8 * 1024 * 1024 bytes (8 MiB).
|
||||||
|
#[must_use]
|
||||||
|
pub fn stack_size(self, size: usize) -> Self {
|
||||||
|
Self { stack_size: size, ..self }
|
||||||
|
}
|
||||||
|
|
||||||
/// Specifies the distribution function that decides how updates are grouped
|
/// Specifies the distribution function that decides how updates are grouped
|
||||||
/// before execution.
|
/// before execution.
|
||||||
///
|
///
|
||||||
|
@ -176,6 +188,7 @@ where
|
||||||
ctrlc_handler,
|
ctrlc_handler,
|
||||||
distribution_f: _,
|
distribution_f: _,
|
||||||
worker_queue_size,
|
worker_queue_size,
|
||||||
|
stack_size,
|
||||||
} = self;
|
} = self;
|
||||||
|
|
||||||
DispatcherBuilder {
|
DispatcherBuilder {
|
||||||
|
@ -187,6 +200,7 @@ where
|
||||||
ctrlc_handler,
|
ctrlc_handler,
|
||||||
distribution_f: f,
|
distribution_f: f,
|
||||||
worker_queue_size,
|
worker_queue_size,
|
||||||
|
stack_size,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -202,6 +216,7 @@ where
|
||||||
distribution_f,
|
distribution_f,
|
||||||
worker_queue_size,
|
worker_queue_size,
|
||||||
ctrlc_handler,
|
ctrlc_handler,
|
||||||
|
stack_size,
|
||||||
} = self;
|
} = self;
|
||||||
|
|
||||||
// If the `ctrlc_handler` feature is not enabled, don't emit a warning.
|
// If the `ctrlc_handler` feature is not enabled, don't emit a warning.
|
||||||
|
@ -216,6 +231,7 @@ where
|
||||||
state: ShutdownToken::new(),
|
state: ShutdownToken::new(),
|
||||||
distribution_f,
|
distribution_f,
|
||||||
worker_queue_size,
|
worker_queue_size,
|
||||||
|
stack_size,
|
||||||
workers: HashMap::new(),
|
workers: HashMap::new(),
|
||||||
default_worker: None,
|
default_worker: None,
|
||||||
current_number_of_active_workers: Default::default(),
|
current_number_of_active_workers: Default::default(),
|
||||||
|
@ -258,6 +274,7 @@ pub struct Dispatcher<R, Err, Key> {
|
||||||
|
|
||||||
distribution_f: fn(&Update) -> Option<Key>,
|
distribution_f: fn(&Update) -> Option<Key>,
|
||||||
worker_queue_size: usize,
|
worker_queue_size: usize,
|
||||||
|
stack_size: usize,
|
||||||
current_number_of_active_workers: Arc<AtomicU32>,
|
current_number_of_active_workers: Arc<AtomicU32>,
|
||||||
max_number_of_active_workers: Arc<AtomicU32>,
|
max_number_of_active_workers: Arc<AtomicU32>,
|
||||||
// Tokio TX channel parts associated with chat IDs that consume updates sequentially.
|
// Tokio TX channel parts associated with chat IDs that consume updates sequentially.
|
||||||
|
@ -297,6 +314,7 @@ where
|
||||||
Err: Debug,
|
Err: Debug,
|
||||||
{
|
{
|
||||||
const DEFAULT_WORKER_QUEUE_SIZE: usize = 64;
|
const DEFAULT_WORKER_QUEUE_SIZE: usize = 64;
|
||||||
|
const DEFAULT_STACK_SIZE: usize = 8 * 1024 * 1024;
|
||||||
|
|
||||||
DispatcherBuilder {
|
DispatcherBuilder {
|
||||||
bot,
|
bot,
|
||||||
|
@ -310,6 +328,7 @@ where
|
||||||
ctrlc_handler: false,
|
ctrlc_handler: false,
|
||||||
worker_queue_size: DEFAULT_WORKER_QUEUE_SIZE,
|
worker_queue_size: DEFAULT_WORKER_QUEUE_SIZE,
|
||||||
distribution_f: default_distribution_function,
|
distribution_f: default_distribution_function,
|
||||||
|
stack_size: DEFAULT_STACK_SIZE,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -318,7 +337,7 @@ impl<R, Err, Key> Dispatcher<R, Err, Key>
|
||||||
where
|
where
|
||||||
R: Requester + Clone + Send + Sync + 'static,
|
R: Requester + Clone + Send + Sync + 'static,
|
||||||
Err: Send + Sync + 'static,
|
Err: Send + Sync + 'static,
|
||||||
Key: Hash + Eq + Clone,
|
Key: Hash + Eq + Clone + Send,
|
||||||
{
|
{
|
||||||
/// Starts your bot with the default parameters.
|
/// Starts your bot with the default parameters.
|
||||||
///
|
///
|
||||||
|
@ -355,8 +374,8 @@ where
|
||||||
update_listener: UListener,
|
update_listener: UListener,
|
||||||
update_listener_error_handler: Arc<Eh>,
|
update_listener_error_handler: Arc<Eh>,
|
||||||
) where
|
) where
|
||||||
UListener: UpdateListener + 'a,
|
UListener: UpdateListener + Send + 'a,
|
||||||
Eh: ErrorHandler<UListener::Err> + 'a,
|
Eh: ErrorHandler<UListener::Err> + Send + Sync + 'a,
|
||||||
UListener::Err: Debug,
|
UListener::Err: Debug,
|
||||||
{
|
{
|
||||||
self.try_dispatch_with_listener(update_listener, update_listener_error_handler)
|
self.try_dispatch_with_listener(update_listener, update_listener_error_handler)
|
||||||
|
@ -377,8 +396,8 @@ where
|
||||||
update_listener_error_handler: Arc<Eh>,
|
update_listener_error_handler: Arc<Eh>,
|
||||||
) -> Result<(), R::Err>
|
) -> Result<(), R::Err>
|
||||||
where
|
where
|
||||||
UListener: UpdateListener + 'a,
|
UListener: UpdateListener + Send + 'a,
|
||||||
Eh: ErrorHandler<UListener::Err> + 'a,
|
Eh: ErrorHandler<UListener::Err> + Send + Sync + 'a,
|
||||||
UListener::Err: Debug,
|
UListener::Err: Debug,
|
||||||
{
|
{
|
||||||
// FIXME: there should be a way to check if dependency is already inserted
|
// FIXME: there should be a way to check if dependency is already inserted
|
||||||
|
@ -391,33 +410,59 @@ where
|
||||||
log::debug!("hinting allowed updates: {:?}", allowed_updates);
|
log::debug!("hinting allowed updates: {:?}", allowed_updates);
|
||||||
update_listener.hint_allowed_updates(&mut allowed_updates.into_iter());
|
update_listener.hint_allowed_updates(&mut allowed_updates.into_iter());
|
||||||
|
|
||||||
let mut stop_token = Some(update_listener.stop_token());
|
let stop_token = Some(update_listener.stop_token());
|
||||||
|
|
||||||
|
thread::scope(|scope| {
|
||||||
|
scope.spawn(move || {
|
||||||
|
let runtime = Builder::new_multi_thread()
|
||||||
|
.thread_stack_size(self.stack_size)
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
runtime.block_on(self.start_listening(
|
||||||
|
update_listener,
|
||||||
|
update_listener_error_handler,
|
||||||
|
stop_token,
|
||||||
|
));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn start_listening<'a, UListener, Eh>(
|
||||||
|
&'a mut self,
|
||||||
|
mut update_listener: UListener,
|
||||||
|
update_listener_error_handler: Arc<Eh>,
|
||||||
|
mut stop_token: Option<StopToken>,
|
||||||
|
) where
|
||||||
|
UListener: UpdateListener + Send + 'a,
|
||||||
|
Eh: ErrorHandler<UListener::Err> + Send + Sync + 'a,
|
||||||
|
UListener::Err: Debug,
|
||||||
|
{
|
||||||
self.state.start_dispatching();
|
self.state.start_dispatching();
|
||||||
|
|
||||||
{
|
let stream = update_listener.as_stream();
|
||||||
let stream = update_listener.as_stream();
|
tokio::pin!(stream);
|
||||||
tokio::pin!(stream);
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
self.remove_inactive_workers_if_needed().await;
|
self.remove_inactive_workers_if_needed().await;
|
||||||
|
|
||||||
let res = future::select(stream.next(), pin!(self.state.wait_for_changes()))
|
let res = future::select(stream.next(), pin!(self.state.wait_for_changes()))
|
||||||
.map(either)
|
.map(either)
|
||||||
.await
|
.await
|
||||||
.map_either(|l| l.0, |r| r.0);
|
.map_either(|l| l.0, |r| r.0);
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
Either::Left(upd) => match upd {
|
Either::Left(upd) => match upd {
|
||||||
Some(upd) => self.process_update(upd, &update_listener_error_handler).await,
|
Some(upd) => self.process_update(upd, &update_listener_error_handler).await,
|
||||||
None => break,
|
None => break,
|
||||||
},
|
},
|
||||||
Either::Right(()) => {
|
Either::Right(()) => {
|
||||||
if self.state.is_shutting_down() {
|
if self.state.is_shutting_down() {
|
||||||
if let Some(token) = stop_token.take() {
|
if let Some(token) = stop_token.take() {
|
||||||
log::debug!("Start shutting down dispatching...");
|
log::debug!("Start shutting down dispatching...");
|
||||||
token.stop();
|
token.stop();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -435,7 +480,6 @@ where
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
self.state.done();
|
self.state.done();
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn process_update<LErr, LErrHandler>(
|
async fn process_update<LErr, LErrHandler>(
|
||||||
|
|
Loading…
Reference in a new issue