From f8be5ccd8c6166a17fd4b401528e2a1d949b4a73 Mon Sep 17 00:00:00 2001 From: LasterAlex Date: Sun, 20 Oct 2024 04:19:17 +0300 Subject: [PATCH 1/6] Added stack size variability on dispatch --- CHANGELOG.md | 2 + crates/teloxide/Cargo.toml | 2 +- crates/teloxide/src/dispatching/dispatcher.rs | 98 ++++++++++++++----- 3 files changed, 74 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d4d5ed8..054ce32e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `bot.forward`, `bot.edit_live_location`, `bot.stop_live_location`, `bot.set_reaction`, `bot.pin`, `bot.unpin`, `bot.edit_text`, `bot.edit_caption`, `bot.edit_media`, `bot.edit_reply_markup`, `bot.stop_poll_message`, `bot.delete` and `bot.copy` methods to the new `crate::sugar::bot::BotMessagesExt` trait - `req.reply_to` method to the new `crate::sugar::request::RequestReplyExt` trait - `req.disable_link_preview` method to the new `crate::sugar::request::RequestLinkPreviewExt` trait +- `stack_size` setter to `DispatcherBuilder` ### Changed @@ -28,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Now Vec in requests serializes into [number] instead of [ {message_id: number} ], `forward_messages`, `copy_messages` and `delete_messages` now work properly - Now `InlineQueryResultsButton` serializes properly ([issue 1181](https://github.com/teloxide/teloxide/issues/1181)) - Now `ThreadId` is able to serialize in multipart requests ([PR 1179](https://github.com/teloxide/teloxide/pull/1179)) +- Now stack does not overflow on dispatch ([issue 1154](https://github.com/teloxide/teloxide/issues/1154)) ## 0.13.0 - 2024-08-16 diff --git a/crates/teloxide/Cargo.toml b/crates/teloxide/Cargo.toml index 9c46a1e9..a5d6efdb 100644 --- a/crates/teloxide/Cargo.toml +++ b/crates/teloxide/Cargo.toml @@ -89,7 +89,7 @@ dptree = "0.3.0" # Uncomment this if you want to test teloxide with a specific dptree commit # dptree = { git = "https://github.com/teloxide/dptree", rev = "df578e4" } -tokio = { version = "1.39", features = ["fs"] } +tokio = { version = "1.39", features = ["fs", "rt-multi-thread"] } tokio-util = "0.7" tokio-stream = "0.1.8" diff --git a/crates/teloxide/src/dispatching/dispatcher.rs b/crates/teloxide/src/dispatching/dispatcher.rs index ccaf697e..2f90d0e4 100644 --- a/crates/teloxide/src/dispatching/dispatcher.rs +++ b/crates/teloxide/src/dispatching/dispatcher.rs @@ -5,6 +5,7 @@ use crate::{ }, error_handlers::{ErrorHandler, LoggingErrorHandler}, requests::{Request, Requester}, + stop::StopToken, types::{Update, UpdateKind}, update_listeners::{self, UpdateListener}, }; @@ -16,6 +17,7 @@ use futures::{ stream::FuturesUnordered, FutureExt as _, StreamExt as _, }; +use tokio::runtime::Builder; use tokio_stream::wrappers::ReceiverStream; use std::{ @@ -29,6 +31,7 @@ use std::{ atomic::{AtomicBool, AtomicU32, Ordering}, Arc, }, + thread, }; /// The builder for [`Dispatcher`]. @@ -44,6 +47,7 @@ pub struct DispatcherBuilder { ctrlc_handler: bool, distribution_f: fn(&Update) -> Option, worker_queue_size: usize, + stack_size: usize, } impl DispatcherBuilder @@ -104,6 +108,14 @@ where Self { worker_queue_size: size, ..self } } + /// Specifies the stack size of the dispatcher. + /// + /// By default it's 8 * 1024 * 1024 bytes (8 MiB). + #[must_use] + pub fn stack_size(self, size: usize) -> Self { + Self { stack_size: size, ..self } + } + /// Specifies the distribution function that decides how updates are grouped /// before execution. /// @@ -176,6 +188,7 @@ where ctrlc_handler, distribution_f: _, worker_queue_size, + stack_size, } = self; DispatcherBuilder { @@ -187,6 +200,7 @@ where ctrlc_handler, distribution_f: f, worker_queue_size, + stack_size, } } @@ -202,6 +216,7 @@ where distribution_f, worker_queue_size, ctrlc_handler, + stack_size, } = self; // If the `ctrlc_handler` feature is not enabled, don't emit a warning. @@ -216,6 +231,7 @@ where state: ShutdownToken::new(), distribution_f, worker_queue_size, + stack_size, workers: HashMap::new(), default_worker: None, current_number_of_active_workers: Default::default(), @@ -258,6 +274,7 @@ pub struct Dispatcher { distribution_f: fn(&Update) -> Option, worker_queue_size: usize, + stack_size: usize, current_number_of_active_workers: Arc, max_number_of_active_workers: Arc, // Tokio TX channel parts associated with chat IDs that consume updates sequentially. @@ -297,6 +314,7 @@ where Err: Debug, { const DEFAULT_WORKER_QUEUE_SIZE: usize = 64; + const DEFAULT_STACK_SIZE: usize = 8 * 1024 * 1024; DispatcherBuilder { bot, @@ -310,6 +328,7 @@ where ctrlc_handler: false, worker_queue_size: DEFAULT_WORKER_QUEUE_SIZE, distribution_f: default_distribution_function, + stack_size: DEFAULT_STACK_SIZE, } } } @@ -318,7 +337,7 @@ impl Dispatcher where R: Requester + Clone + Send + Sync + 'static, Err: Send + Sync + 'static, - Key: Hash + Eq + Clone, + Key: Hash + Eq + Clone + Send, { /// Starts your bot with the default parameters. /// @@ -355,8 +374,8 @@ where update_listener: UListener, update_listener_error_handler: Arc, ) where - UListener: UpdateListener + 'a, - Eh: ErrorHandler + 'a, + UListener: UpdateListener + Send + 'a, + Eh: ErrorHandler + Send + Sync + 'a, UListener::Err: Debug, { self.try_dispatch_with_listener(update_listener, update_listener_error_handler) @@ -377,8 +396,8 @@ where update_listener_error_handler: Arc, ) -> Result<(), R::Err> where - UListener: UpdateListener + 'a, - Eh: ErrorHandler + 'a, + UListener: UpdateListener + Send + 'a, + Eh: ErrorHandler + Send + Sync + 'a, UListener::Err: Debug, { // FIXME: there should be a way to check if dependency is already inserted @@ -391,33 +410,59 @@ where log::debug!("hinting allowed updates: {:?}", allowed_updates); update_listener.hint_allowed_updates(&mut allowed_updates.into_iter()); - let mut stop_token = Some(update_listener.stop_token()); + let stop_token = Some(update_listener.stop_token()); + thread::scope(|scope| { + scope.spawn(move || { + let runtime = Builder::new_multi_thread() + .thread_stack_size(self.stack_size) + .enable_all() + .build() + .unwrap(); + + runtime.block_on(self.start_listening( + update_listener, + update_listener_error_handler, + stop_token, + )); + }); + }); + Ok(()) + } + + async fn start_listening<'a, UListener, Eh>( + &'a mut self, + mut update_listener: UListener, + update_listener_error_handler: Arc, + mut stop_token: Option, + ) where + UListener: UpdateListener + Send + 'a, + Eh: ErrorHandler + Send + Sync + 'a, + UListener::Err: Debug, + { self.state.start_dispatching(); - { - let stream = update_listener.as_stream(); - tokio::pin!(stream); + let stream = update_listener.as_stream(); + tokio::pin!(stream); - loop { - self.remove_inactive_workers_if_needed().await; + loop { + self.remove_inactive_workers_if_needed().await; - let res = future::select(stream.next(), pin!(self.state.wait_for_changes())) - .map(either) - .await - .map_either(|l| l.0, |r| r.0); + let res = future::select(stream.next(), pin!(self.state.wait_for_changes())) + .map(either) + .await + .map_either(|l| l.0, |r| r.0); - match res { - Either::Left(upd) => match upd { - Some(upd) => self.process_update(upd, &update_listener_error_handler).await, - None => break, - }, - Either::Right(()) => { - if self.state.is_shutting_down() { - if let Some(token) = stop_token.take() { - log::debug!("Start shutting down dispatching..."); - token.stop(); - } + match res { + Either::Left(upd) => match upd { + Some(upd) => self.process_update(upd, &update_listener_error_handler).await, + None => break, + }, + Either::Right(()) => { + if self.state.is_shutting_down() { + if let Some(token) = stop_token.take() { + log::debug!("Start shutting down dispatching..."); + token.stop(); } } } @@ -435,7 +480,6 @@ where .await; self.state.done(); - Ok(()) } async fn process_update( From 2f085906858f9e0af51dfaa315a0a485cc1493bc Mon Sep 17 00:00:00 2001 From: LasterAlex Date: Sun, 20 Oct 2024 04:48:29 +0300 Subject: [PATCH 2/6] Removed unnecessary trait bounds --- crates/teloxide/src/dispatching/dispatcher.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/teloxide/src/dispatching/dispatcher.rs b/crates/teloxide/src/dispatching/dispatcher.rs index 2f90d0e4..4bc8297e 100644 --- a/crates/teloxide/src/dispatching/dispatcher.rs +++ b/crates/teloxide/src/dispatching/dispatcher.rs @@ -436,8 +436,8 @@ where update_listener_error_handler: Arc, mut stop_token: Option, ) where - UListener: UpdateListener + Send + 'a, - Eh: ErrorHandler + Send + Sync + 'a, + UListener: UpdateListener + 'a, + Eh: ErrorHandler + 'a, UListener::Err: Debug, { self.state.start_dispatching(); From 89b8247cbfd42ca3ae39db339de5af9fe9813aa1 Mon Sep 17 00:00:00 2001 From: LasterAlex Date: Sun, 20 Oct 2024 12:28:49 +0300 Subject: [PATCH 3/6] Fixed changelog + more clear imports --- CHANGELOG.md | 3 ++- crates/teloxide/src/dispatching/dispatcher.rs | 6 ++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 054ce32e..58900259 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `bot.forward`, `bot.edit_live_location`, `bot.stop_live_location`, `bot.set_reaction`, `bot.pin`, `bot.unpin`, `bot.edit_text`, `bot.edit_caption`, `bot.edit_media`, `bot.edit_reply_markup`, `bot.stop_poll_message`, `bot.delete` and `bot.copy` methods to the new `crate::sugar::bot::BotMessagesExt` trait - `req.reply_to` method to the new `crate::sugar::request::RequestReplyExt` trait - `req.disable_link_preview` method to the new `crate::sugar::request::RequestLinkPreviewExt` trait -- `stack_size` setter to `DispatcherBuilder` +- `stack_size` setter to `DispatcherBuilder` ([PR 1185](https://github.com/teloxide/teloxide/pull/1185)) ### Changed @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - MSRV (Minimal Supported Rust Version) was bumped from `1.70.0` to `1.80.0` - Some dependencies was bumped: `sqlx` to `0.8.1`, `tower` to `0.5.0`, `reqwest` to `0.12.7` - `tokio` version was explicitly specified as `1.39` +- Added new `Send` and `Sync` trait bounds to `try_dispatch_with_listener` and `dispatch_with_listener` generic parameters ([PR 1185](https://github.com/teloxide/teloxide/pull/1185)) [**BC**] ### Fixed diff --git a/crates/teloxide/src/dispatching/dispatcher.rs b/crates/teloxide/src/dispatching/dispatcher.rs index 4bc8297e..728d23dd 100644 --- a/crates/teloxide/src/dispatching/dispatcher.rs +++ b/crates/teloxide/src/dispatching/dispatcher.rs @@ -17,7 +17,6 @@ use futures::{ stream::FuturesUnordered, FutureExt as _, StreamExt as _, }; -use tokio::runtime::Builder; use tokio_stream::wrappers::ReceiverStream; use std::{ @@ -31,7 +30,6 @@ use std::{ atomic::{AtomicBool, AtomicU32, Ordering}, Arc, }, - thread, }; /// The builder for [`Dispatcher`]. @@ -412,9 +410,9 @@ where let stop_token = Some(update_listener.stop_token()); - thread::scope(|scope| { + std::thread::scope(|scope| { scope.spawn(move || { - let runtime = Builder::new_multi_thread() + let runtime = tokio::runtime::Builder::new_multi_thread() .thread_stack_size(self.stack_size) .enable_all() .build() From 8826733dbd6ba35f8f3d38beda4fba7777285e68 Mon Sep 17 00:00:00 2001 From: Lewis Pearson Date: Mon, 21 Oct 2024 02:11:50 -0400 Subject: [PATCH 4/6] Improve wording --- crates/teloxide/src/dispatching/dispatcher.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/teloxide/src/dispatching/dispatcher.rs b/crates/teloxide/src/dispatching/dispatcher.rs index 728d23dd..d70cd3eb 100644 --- a/crates/teloxide/src/dispatching/dispatcher.rs +++ b/crates/teloxide/src/dispatching/dispatcher.rs @@ -106,9 +106,9 @@ where Self { worker_queue_size: size, ..self } } - /// Specifies the stack size of the dispatcher. + /// Specifies the stack size available to the dispatcher. /// - /// By default it's 8 * 1024 * 1024 bytes (8 MiB). + /// By default, it's 8 * 1024 * 1024 bytes (8 MiB). #[must_use] pub fn stack_size(self, size: usize) -> Self { Self { stack_size: size, ..self } From cd3dcc161115c8ed6c9acb7f948f1f756d9e03af Mon Sep 17 00:00:00 2001 From: Lewis Pearson Date: Mon, 21 Oct 2024 19:39:26 -0400 Subject: [PATCH 5/6] Write a clarifying comment on Tokio's stack size --- crates/teloxide/src/dispatching/dispatcher.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/teloxide/src/dispatching/dispatcher.rs b/crates/teloxide/src/dispatching/dispatcher.rs index d70cd3eb..56dc5f6e 100644 --- a/crates/teloxide/src/dispatching/dispatcher.rs +++ b/crates/teloxide/src/dispatching/dispatcher.rs @@ -410,7 +410,12 @@ where let stop_token = Some(update_listener.stop_token()); - std::thread::scope(|scope| { + // We create a new Tokio runtime in order to set the correct stack size. We do + // it a scoped thread because Tokio runtimes cannot be nested. We need a scoped + // thread because of the lifetime `'a` in `&'a mut self` and because scoped + // threads are automatically joined. See this issue: + // . + std::thread::scope(|scope: &std::thread::Scope<'_, '_>| { scope.spawn(move || { let runtime = tokio::runtime::Builder::new_multi_thread() .thread_stack_size(self.stack_size) From 3446aecf5e7f542a7559aed54986f5e60d28dc5e Mon Sep 17 00:00:00 2001 From: Lewis Pearson Date: Mon, 21 Oct 2024 19:43:18 -0400 Subject: [PATCH 6/6] Reword the changelog a bit --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 58900259..1229ca73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - MSRV (Minimal Supported Rust Version) was bumped from `1.70.0` to `1.80.0` - Some dependencies was bumped: `sqlx` to `0.8.1`, `tower` to `0.5.0`, `reqwest` to `0.12.7` - `tokio` version was explicitly specified as `1.39` -- Added new `Send` and `Sync` trait bounds to `try_dispatch_with_listener` and `dispatch_with_listener` generic parameters ([PR 1185](https://github.com/teloxide/teloxide/pull/1185)) [**BC**] +- Added new `Send` and `Sync` trait bounds to the `UListener` and `Eh` generic parameters of `try_dispatch_with_listener` and `dispatch_with_listener` ([PR 1185](https://github.com/teloxide/teloxide/pull/1185)) [**BC**] ### Fixed