Merge pull request #1185 from LasterAlex/fix-stack-overflow

Fixed stack overflow and added stack_size setter
This commit is contained in:
Lewis Pearson 2024-10-22 00:12:27 +00:00 committed by GitHub
commit 24a8d5f9d5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 78 additions and 28 deletions

View file

@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `bot.forward`, `bot.edit_live_location`, `bot.stop_live_location`, `bot.set_reaction`, `bot.pin`, `bot.unpin`, `bot.edit_text`, `bot.edit_caption`, `bot.edit_media`, `bot.edit_reply_markup`, `bot.stop_poll_message`, `bot.delete` and `bot.copy` methods to the new `crate::sugar::bot::BotMessagesExt` trait - `bot.forward`, `bot.edit_live_location`, `bot.stop_live_location`, `bot.set_reaction`, `bot.pin`, `bot.unpin`, `bot.edit_text`, `bot.edit_caption`, `bot.edit_media`, `bot.edit_reply_markup`, `bot.stop_poll_message`, `bot.delete` and `bot.copy` methods to the new `crate::sugar::bot::BotMessagesExt` trait
- `req.reply_to` method to the new `crate::sugar::request::RequestReplyExt` trait - `req.reply_to` method to the new `crate::sugar::request::RequestReplyExt` trait
- `req.disable_link_preview` method to the new `crate::sugar::request::RequestLinkPreviewExt` trait - `req.disable_link_preview` method to the new `crate::sugar::request::RequestLinkPreviewExt` trait
- `stack_size` setter to `DispatcherBuilder` ([PR 1185](https://github.com/teloxide/teloxide/pull/1185))
- `utils::render` module to render HTML/Markdown-formatted output ([PR 1152](https://github.com/teloxide/teloxide/pull/1152)) - `utils::render` module to render HTML/Markdown-formatted output ([PR 1152](https://github.com/teloxide/teloxide/pull/1152))
### Changed ### Changed
@ -23,12 +24,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- MSRV (Minimal Supported Rust Version) was bumped from `1.70.0` to `1.80.0` - MSRV (Minimal Supported Rust Version) was bumped from `1.70.0` to `1.80.0`
- Some dependencies was bumped: `sqlx` to `0.8.1`, `tower` to `0.5.0`, `reqwest` to `0.12.7` - Some dependencies was bumped: `sqlx` to `0.8.1`, `tower` to `0.5.0`, `reqwest` to `0.12.7`
- `tokio` version was explicitly specified as `1.39` - `tokio` version was explicitly specified as `1.39`
- Added new `Send` and `Sync` trait bounds to the `UListener` and `Eh` generic parameters of `try_dispatch_with_listener` and `dispatch_with_listener` ([PR 1185](https://github.com/teloxide/teloxide/pull/1185)) [**BC**]
### Fixed ### Fixed
- Now Vec<MessageId> in requests serializes into [number] instead of [ {message_id: number} ], `forward_messages`, `copy_messages` and `delete_messages` now work properly - Now Vec<MessageId> in requests serializes into [number] instead of [ {message_id: number} ], `forward_messages`, `copy_messages` and `delete_messages` now work properly
- Now `InlineQueryResultsButton` serializes properly ([issue 1181](https://github.com/teloxide/teloxide/issues/1181)) - Now `InlineQueryResultsButton` serializes properly ([issue 1181](https://github.com/teloxide/teloxide/issues/1181))
- Now `ThreadId` is able to serialize in multipart requests ([PR 1179](https://github.com/teloxide/teloxide/pull/1179)) - Now `ThreadId` is able to serialize in multipart requests ([PR 1179](https://github.com/teloxide/teloxide/pull/1179))
- Now stack does not overflow on dispatch ([issue 1154](https://github.com/teloxide/teloxide/issues/1154))
## 0.13.0 - 2024-08-16 ## 0.13.0 - 2024-08-16

View file

@ -89,7 +89,7 @@ dptree = "0.3.0"
# Uncomment this if you want to test teloxide with a specific dptree commit # Uncomment this if you want to test teloxide with a specific dptree commit
# dptree = { git = "https://github.com/teloxide/dptree", rev = "df578e4" } # dptree = { git = "https://github.com/teloxide/dptree", rev = "df578e4" }
tokio = { version = "1.39", features = ["fs"] } tokio = { version = "1.39", features = ["fs", "rt-multi-thread"] }
tokio-util = "0.7" tokio-util = "0.7"
tokio-stream = "0.1.8" tokio-stream = "0.1.8"

View file

@ -5,6 +5,7 @@ use crate::{
}, },
error_handlers::{ErrorHandler, LoggingErrorHandler}, error_handlers::{ErrorHandler, LoggingErrorHandler},
requests::{Request, Requester}, requests::{Request, Requester},
stop::StopToken,
types::{Update, UpdateKind}, types::{Update, UpdateKind},
update_listeners::{self, UpdateListener}, update_listeners::{self, UpdateListener},
}; };
@ -44,6 +45,7 @@ pub struct DispatcherBuilder<R, Err, Key> {
ctrlc_handler: bool, ctrlc_handler: bool,
distribution_f: fn(&Update) -> Option<Key>, distribution_f: fn(&Update) -> Option<Key>,
worker_queue_size: usize, worker_queue_size: usize,
stack_size: usize,
} }
impl<R, Err, Key> DispatcherBuilder<R, Err, Key> impl<R, Err, Key> DispatcherBuilder<R, Err, Key>
@ -104,6 +106,14 @@ where
Self { worker_queue_size: size, ..self } Self { worker_queue_size: size, ..self }
} }
/// Specifies the stack size available to the dispatcher.
///
/// By default, it's 8 * 1024 * 1024 bytes (8 MiB).
#[must_use]
pub fn stack_size(self, size: usize) -> Self {
Self { stack_size: size, ..self }
}
/// Specifies the distribution function that decides how updates are grouped /// Specifies the distribution function that decides how updates are grouped
/// before execution. /// before execution.
/// ///
@ -176,6 +186,7 @@ where
ctrlc_handler, ctrlc_handler,
distribution_f: _, distribution_f: _,
worker_queue_size, worker_queue_size,
stack_size,
} = self; } = self;
DispatcherBuilder { DispatcherBuilder {
@ -187,6 +198,7 @@ where
ctrlc_handler, ctrlc_handler,
distribution_f: f, distribution_f: f,
worker_queue_size, worker_queue_size,
stack_size,
} }
} }
@ -202,6 +214,7 @@ where
distribution_f, distribution_f,
worker_queue_size, worker_queue_size,
ctrlc_handler, ctrlc_handler,
stack_size,
} = self; } = self;
// If the `ctrlc_handler` feature is not enabled, don't emit a warning. // If the `ctrlc_handler` feature is not enabled, don't emit a warning.
@ -216,6 +229,7 @@ where
state: ShutdownToken::new(), state: ShutdownToken::new(),
distribution_f, distribution_f,
worker_queue_size, worker_queue_size,
stack_size,
workers: HashMap::new(), workers: HashMap::new(),
default_worker: None, default_worker: None,
current_number_of_active_workers: Default::default(), current_number_of_active_workers: Default::default(),
@ -258,6 +272,7 @@ pub struct Dispatcher<R, Err, Key> {
distribution_f: fn(&Update) -> Option<Key>, distribution_f: fn(&Update) -> Option<Key>,
worker_queue_size: usize, worker_queue_size: usize,
stack_size: usize,
current_number_of_active_workers: Arc<AtomicU32>, current_number_of_active_workers: Arc<AtomicU32>,
max_number_of_active_workers: Arc<AtomicU32>, max_number_of_active_workers: Arc<AtomicU32>,
// Tokio TX channel parts associated with chat IDs that consume updates sequentially. // Tokio TX channel parts associated with chat IDs that consume updates sequentially.
@ -297,6 +312,7 @@ where
Err: Debug, Err: Debug,
{ {
const DEFAULT_WORKER_QUEUE_SIZE: usize = 64; const DEFAULT_WORKER_QUEUE_SIZE: usize = 64;
const DEFAULT_STACK_SIZE: usize = 8 * 1024 * 1024;
DispatcherBuilder { DispatcherBuilder {
bot, bot,
@ -310,6 +326,7 @@ where
ctrlc_handler: false, ctrlc_handler: false,
worker_queue_size: DEFAULT_WORKER_QUEUE_SIZE, worker_queue_size: DEFAULT_WORKER_QUEUE_SIZE,
distribution_f: default_distribution_function, distribution_f: default_distribution_function,
stack_size: DEFAULT_STACK_SIZE,
} }
} }
} }
@ -318,7 +335,7 @@ impl<R, Err, Key> Dispatcher<R, Err, Key>
where where
R: Requester + Clone + Send + Sync + 'static, R: Requester + Clone + Send + Sync + 'static,
Err: Send + Sync + 'static, Err: Send + Sync + 'static,
Key: Hash + Eq + Clone, Key: Hash + Eq + Clone + Send,
{ {
/// Starts your bot with the default parameters. /// Starts your bot with the default parameters.
/// ///
@ -355,8 +372,8 @@ where
update_listener: UListener, update_listener: UListener,
update_listener_error_handler: Arc<Eh>, update_listener_error_handler: Arc<Eh>,
) where ) where
UListener: UpdateListener + 'a, UListener: UpdateListener + Send + 'a,
Eh: ErrorHandler<UListener::Err> + 'a, Eh: ErrorHandler<UListener::Err> + Send + Sync + 'a,
UListener::Err: Debug, UListener::Err: Debug,
{ {
self.try_dispatch_with_listener(update_listener, update_listener_error_handler) self.try_dispatch_with_listener(update_listener, update_listener_error_handler)
@ -377,8 +394,8 @@ where
update_listener_error_handler: Arc<Eh>, update_listener_error_handler: Arc<Eh>,
) -> Result<(), R::Err> ) -> Result<(), R::Err>
where where
UListener: UpdateListener + 'a, UListener: UpdateListener + Send + 'a,
Eh: ErrorHandler<UListener::Err> + 'a, Eh: ErrorHandler<UListener::Err> + Send + Sync + 'a,
UListener::Err: Debug, UListener::Err: Debug,
{ {
// FIXME: there should be a way to check if dependency is already inserted // FIXME: there should be a way to check if dependency is already inserted
@ -391,33 +408,64 @@ where
log::debug!("hinting allowed updates: {:?}", allowed_updates); log::debug!("hinting allowed updates: {:?}", allowed_updates);
update_listener.hint_allowed_updates(&mut allowed_updates.into_iter()); update_listener.hint_allowed_updates(&mut allowed_updates.into_iter());
let mut stop_token = Some(update_listener.stop_token()); let stop_token = Some(update_listener.stop_token());
// We create a new Tokio runtime in order to set the correct stack size. We do
// it a scoped thread because Tokio runtimes cannot be nested. We need a scoped
// thread because of the lifetime `'a` in `&'a mut self` and because scoped
// threads are automatically joined. See this issue:
// <https://github.com/teloxide/teloxide/issues/1154>.
std::thread::scope(|scope: &std::thread::Scope<'_, '_>| {
scope.spawn(move || {
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_stack_size(self.stack_size)
.enable_all()
.build()
.unwrap();
runtime.block_on(self.start_listening(
update_listener,
update_listener_error_handler,
stop_token,
));
});
});
Ok(())
}
async fn start_listening<'a, UListener, Eh>(
&'a mut self,
mut update_listener: UListener,
update_listener_error_handler: Arc<Eh>,
mut stop_token: Option<StopToken>,
) where
UListener: UpdateListener + 'a,
Eh: ErrorHandler<UListener::Err> + 'a,
UListener::Err: Debug,
{
self.state.start_dispatching(); self.state.start_dispatching();
{ let stream = update_listener.as_stream();
let stream = update_listener.as_stream(); tokio::pin!(stream);
tokio::pin!(stream);
loop { loop {
self.remove_inactive_workers_if_needed().await; self.remove_inactive_workers_if_needed().await;
let res = future::select(stream.next(), pin!(self.state.wait_for_changes())) let res = future::select(stream.next(), pin!(self.state.wait_for_changes()))
.map(either) .map(either)
.await .await
.map_either(|l| l.0, |r| r.0); .map_either(|l| l.0, |r| r.0);
match res { match res {
Either::Left(upd) => match upd { Either::Left(upd) => match upd {
Some(upd) => self.process_update(upd, &update_listener_error_handler).await, Some(upd) => self.process_update(upd, &update_listener_error_handler).await,
None => break, None => break,
}, },
Either::Right(()) => { Either::Right(()) => {
if self.state.is_shutting_down() { if self.state.is_shutting_down() {
if let Some(token) = stop_token.take() { if let Some(token) = stop_token.take() {
log::debug!("Start shutting down dispatching..."); log::debug!("Start shutting down dispatching...");
token.stop(); token.stop();
}
} }
} }
} }
@ -435,7 +483,6 @@ where
.await; .await;
self.state.done(); self.state.done();
Ok(())
} }
async fn process_update<LErr, LErrHandler>( async fn process_update<LErr, LErrHandler>(