From c9a6b77434ccb52942b96779e464675c17f9bb5d Mon Sep 17 00:00:00 2001 From: Maybe Waffle Date: Wed, 13 Apr 2022 16:17:06 +0400 Subject: [PATCH] Make worker queue size in dp configurable --- Cargo.toml | 2 +- src/dispatching/dispatcher.rs | 38 +++++++++++++++++++++++++++++------ 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 013f97bd..3ff6a2d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "teloxide" version = "0.7.2" -edition = "2018" +edition = "2021" description = "An elegant Telegram bots framework for Rust" repository = "https://github.com/teloxide/teloxide" documentation = "https://docs.rs/teloxide/" diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index c6f14616..2ff6470e 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -32,6 +32,7 @@ pub struct DispatcherBuilder { handler: Arc>, default_handler: DefaultHandler, error_handler: Arc + Send + Sync>, + worker_queue_size: usize, } impl DispatcherBuilder @@ -75,6 +76,14 @@ where Self { dependencies, ..self } } + /// Specifies size of the queue for workers. + /// + /// By default it's 64. + #[must_use] + pub fn worker_queue_size(self, size: usize) -> Self { + Self { worker_queue_size: size, ..self } + } + /// Constructs [`Dispatcher`]. #[must_use] pub fn build(self) -> Dispatcher { @@ -86,6 +95,7 @@ where error_handler: self.error_handler, allowed_updates: Default::default(), state: ShutdownToken::new(), + worker_queue_size: self.worker_queue_size, workers: HashMap::new(), default_worker: None, } @@ -104,6 +114,7 @@ pub struct Dispatcher { handler: Arc>, default_handler: DefaultHandler, + worker_queue_size: usize, // Tokio TX channel parts associated with chat IDs that consume updates sequentially. workers: HashMap, // The default TX part that consume updates concurrently. @@ -140,6 +151,8 @@ where where Err: Debug, { + const DEFAULT_WORKER_QUEUE_SIZE: usize = 64; + DispatcherBuilder { bot, dependencies: DependencyMap::new(), @@ -149,6 +162,7 @@ where Box::pin(async {}) }), error_handler: LoggingErrorHandler::new(), + worker_queue_size: DEFAULT_WORKER_QUEUE_SIZE, } } @@ -270,10 +284,22 @@ where let worker = match upd.chat() { Some(chat) => self.workers.entry(chat.id).or_insert_with(|| { - spawn_worker(deps, handler, default_handler, error_handler) + spawn_worker( + deps, + handler, + default_handler, + error_handler, + self.worker_queue_size, + ) }), None => self.default_worker.get_or_insert_with(|| { - spawn_default_worker(deps, handler, default_handler, error_handler) + spawn_default_worker( + deps, + handler, + default_handler, + error_handler, + self.worker_queue_size, + ) }), }; @@ -316,18 +342,17 @@ where } } -const WORKER_QUEUE_SIZE: usize = 64; - fn spawn_worker( deps: DependencyMap, handler: Arc>, default_handler: DefaultHandler, error_handler: Arc + Send + Sync>, + queue_size: usize, ) -> Worker where Err: Send + Sync + 'static, { - let (tx, rx) = tokio::sync::mpsc::channel(WORKER_QUEUE_SIZE); + let (tx, rx) = tokio::sync::mpsc::channel(queue_size); let deps = Arc::new(deps); @@ -348,11 +373,12 @@ fn spawn_default_worker( handler: Arc>, default_handler: DefaultHandler, error_handler: Arc + Send + Sync>, + queue_size: usize, ) -> Worker where Err: Send + Sync + 'static, { - let (tx, rx) = tokio::sync::mpsc::channel(WORKER_QUEUE_SIZE); + let (tx, rx) = tokio::sync::mpsc::channel(queue_size); let deps = Arc::new(deps);