Make worker queue size in dp configurable

This commit is contained in:
Maybe Waffle 2022-04-13 16:17:06 +04:00
parent 050ede23fa
commit c9a6b77434
2 changed files with 33 additions and 7 deletions

View file

@ -1,7 +1,7 @@
[package] [package]
name = "teloxide" name = "teloxide"
version = "0.7.2" version = "0.7.2"
edition = "2018" edition = "2021"
description = "An elegant Telegram bots framework for Rust" description = "An elegant Telegram bots framework for Rust"
repository = "https://github.com/teloxide/teloxide" repository = "https://github.com/teloxide/teloxide"
documentation = "https://docs.rs/teloxide/" documentation = "https://docs.rs/teloxide/"

View file

@ -32,6 +32,7 @@ pub struct DispatcherBuilder<R, Err> {
handler: Arc<UpdateHandler<Err>>, handler: Arc<UpdateHandler<Err>>,
default_handler: DefaultHandler, default_handler: DefaultHandler,
error_handler: Arc<dyn ErrorHandler<Err> + Send + Sync>, error_handler: Arc<dyn ErrorHandler<Err> + Send + Sync>,
worker_queue_size: usize,
} }
impl<R, Err> DispatcherBuilder<R, Err> impl<R, Err> DispatcherBuilder<R, Err>
@ -75,6 +76,14 @@ where
Self { dependencies, ..self } Self { dependencies, ..self }
} }
/// Specifies size of the queue for workers.
///
/// By default it's 64.
#[must_use]
pub fn worker_queue_size(self, size: usize) -> Self {
Self { worker_queue_size: size, ..self }
}
/// Constructs [`Dispatcher`]. /// Constructs [`Dispatcher`].
#[must_use] #[must_use]
pub fn build(self) -> Dispatcher<R, Err> { pub fn build(self) -> Dispatcher<R, Err> {
@ -86,6 +95,7 @@ where
error_handler: self.error_handler, error_handler: self.error_handler,
allowed_updates: Default::default(), allowed_updates: Default::default(),
state: ShutdownToken::new(), state: ShutdownToken::new(),
worker_queue_size: self.worker_queue_size,
workers: HashMap::new(), workers: HashMap::new(),
default_worker: None, default_worker: None,
} }
@ -104,6 +114,7 @@ pub struct Dispatcher<R, Err> {
handler: Arc<UpdateHandler<Err>>, handler: Arc<UpdateHandler<Err>>,
default_handler: DefaultHandler, default_handler: DefaultHandler,
worker_queue_size: usize,
// Tokio TX channel parts associated with chat IDs that consume updates sequentially. // Tokio TX channel parts associated with chat IDs that consume updates sequentially.
workers: HashMap<ChatId, Worker>, workers: HashMap<ChatId, Worker>,
// The default TX part that consume updates concurrently. // The default TX part that consume updates concurrently.
@ -140,6 +151,8 @@ where
where where
Err: Debug, Err: Debug,
{ {
const DEFAULT_WORKER_QUEUE_SIZE: usize = 64;
DispatcherBuilder { DispatcherBuilder {
bot, bot,
dependencies: DependencyMap::new(), dependencies: DependencyMap::new(),
@ -149,6 +162,7 @@ where
Box::pin(async {}) Box::pin(async {})
}), }),
error_handler: LoggingErrorHandler::new(), error_handler: LoggingErrorHandler::new(),
worker_queue_size: DEFAULT_WORKER_QUEUE_SIZE,
} }
} }
@ -270,10 +284,22 @@ where
let worker = match upd.chat() { let worker = match upd.chat() {
Some(chat) => self.workers.entry(chat.id).or_insert_with(|| { Some(chat) => self.workers.entry(chat.id).or_insert_with(|| {
spawn_worker(deps, handler, default_handler, error_handler) spawn_worker(
deps,
handler,
default_handler,
error_handler,
self.worker_queue_size,
)
}), }),
None => self.default_worker.get_or_insert_with(|| { None => self.default_worker.get_or_insert_with(|| {
spawn_default_worker(deps, handler, default_handler, error_handler) spawn_default_worker(
deps,
handler,
default_handler,
error_handler,
self.worker_queue_size,
)
}), }),
}; };
@ -316,18 +342,17 @@ where
} }
} }
const WORKER_QUEUE_SIZE: usize = 64;
fn spawn_worker<Err>( fn spawn_worker<Err>(
deps: DependencyMap, deps: DependencyMap,
handler: Arc<UpdateHandler<Err>>, handler: Arc<UpdateHandler<Err>>,
default_handler: DefaultHandler, default_handler: DefaultHandler,
error_handler: Arc<dyn ErrorHandler<Err> + Send + Sync>, error_handler: Arc<dyn ErrorHandler<Err> + Send + Sync>,
queue_size: usize,
) -> Worker ) -> Worker
where where
Err: Send + Sync + 'static, Err: Send + Sync + 'static,
{ {
let (tx, rx) = tokio::sync::mpsc::channel(WORKER_QUEUE_SIZE); let (tx, rx) = tokio::sync::mpsc::channel(queue_size);
let deps = Arc::new(deps); let deps = Arc::new(deps);
@ -348,11 +373,12 @@ fn spawn_default_worker<Err>(
handler: Arc<UpdateHandler<Err>>, handler: Arc<UpdateHandler<Err>>,
default_handler: DefaultHandler, default_handler: DefaultHandler,
error_handler: Arc<dyn ErrorHandler<Err> + Send + Sync>, error_handler: Arc<dyn ErrorHandler<Err> + Send + Sync>,
queue_size: usize,
) -> Worker ) -> Worker
where where
Err: Send + Sync + 'static, Err: Send + Sync + 'static,
{ {
let (tx, rx) = tokio::sync::mpsc::channel(WORKER_QUEUE_SIZE); let (tx, rx) = tokio::sync::mpsc::channel(queue_size);
let deps = Arc::new(deps); let deps = Arc::new(deps);