From 7e34d7ea474e19b5bfbfbe93d45a3fbb6c38bcec Mon Sep 17 00:00:00 2001 From: Hirrolot Date: Wed, 13 Apr 2022 02:45:02 +0600 Subject: [PATCH 01/10] Implement concurrent update handling --- CHANGELOG.md | 4 + src/dispatching/dispatcher.rs | 161 ++++++++++++++++++++++++++++------ 2 files changed, 137 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7becfdda..34f2e5d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `BotCommands::descriptions` now returns `CommandDescriptions` instead of `String` [**BC**]. - Mark `Dialogue::new` as `#[must_use]`. +### Fixed + + - Concurrent update handling in the new dispatcher ([issue 536](https://github.com/teloxide/teloxide/issues/536)). + ### Deprecated - `HandlerFactory` and `HandlerExt::dispatch_by` in favour of `teloxide::handler!`. diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 5c93b918..a935d3b0 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -10,9 +10,15 @@ use crate::{ use dptree::di::{DependencyMap, DependencySupplier}; use futures::{future::BoxFuture, StreamExt}; -use std::{collections::HashSet, fmt::Debug, ops::ControlFlow, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + fmt::Debug, + ops::{ControlFlow, Deref}, + sync::Arc, +}; use teloxide_core::{requests::Request, types::UpdateKind}; use tokio::time::timeout; +use tokio_stream::wrappers::ReceiverStream; use std::future::Future; @@ -20,8 +26,8 @@ use std::future::Future; pub struct DispatcherBuilder { bot: R, dependencies: DependencyMap, - handler: UpdateHandler, - default_handler: DefaultHandler, + handler: Arc>, + default_handler: Arc, error_handler: Arc + Send + Sync>, } @@ -42,10 +48,10 @@ where let handler = Arc::new(handler); Self { - default_handler: Box::new(move |upd| { + default_handler: Arc::new(Box::new(move |upd| { let handler = Arc::clone(&handler); Box::pin(handler(upd)) - }), + })), ..self } } @@ -72,22 +78,31 @@ where Dispatcher { bot: self.bot.clone(), dependencies: self.dependencies, - handler: self.handler, - default_handler: self.default_handler, - error_handler: self.error_handler, + handler: Arc::clone(&self.handler), + default_handler: Arc::clone(&self.default_handler), + error_handler: Arc::clone(&self.error_handler), allowed_updates: Default::default(), state: ShutdownToken::new(), + workers: HashMap::new(), + default_worker: None, } } } /// The base for update dispatching. +/// +/// Updates from different chats are handles concurrently, whereas updates from +/// the same chats are handled sequentially. If the dispatcher is unable to +/// determine a chat ID of an incoming update, it will be handled concurrently. pub struct Dispatcher { bot: R, dependencies: DependencyMap, - handler: UpdateHandler, - default_handler: DefaultHandler, + handler: Arc>, + default_handler: Arc, + workers: HashMap, + default_worker: Option, + error_handler: Arc + Send + Sync>, // TODO: respect allowed_udpates allowed_updates: HashSet, @@ -95,6 +110,8 @@ pub struct Dispatcher { state: ShutdownToken, } +type WorkerTx = tokio::sync::mpsc::Sender; + // TODO: it is allowed to return message as response on telegram request in // webhooks, so we can allow this too. See more there: https://core.telegram.org/bots/api#making-requests-when-getting-updates @@ -117,11 +134,11 @@ where DispatcherBuilder { bot, dependencies: DependencyMap::new(), - handler, - default_handler: Box::new(|upd| { + handler: Arc::new(handler), + default_handler: Arc::new(Box::new(|upd| { log::warn!("Unhandled update: {:?}", upd); Box::pin(async {}) - }), + })), error_handler: LoggingErrorHandler::new(), } } @@ -205,13 +222,13 @@ where } } - // TODO: wait for executing handlers? - + self.workers.drain(); + self.default_worker.take(); self.state.done(); } async fn process_update( - &self, + &mut self, update: Result, err_handler: &Arc, ) where @@ -229,19 +246,21 @@ where return; } - let mut deps = self.dependencies.clone(); - deps.insert(upd); + let deps = self.dependencies.clone(); + let handler = Arc::clone(&self.handler); + let default_handler = Arc::clone(&self.default_handler); + let error_handler = Arc::clone(&self.error_handler); - match self.handler.dispatch(deps).await { - ControlFlow::Break(Ok(())) => {} - ControlFlow::Break(Err(err)) => { - self.error_handler.clone().handle_error(err).await - } - ControlFlow::Continue(deps) => { - let upd = deps.get(); - (self.default_handler)(upd).await; - } - } + let tx = match upd.chat() { + Some(chat) => self.workers.entry(chat.id).or_insert_with(|| { + spawn_worker(deps, handler, default_handler, error_handler) + }), + None => self.default_worker.get_or_insert_with(|| { + spawn_default_worker(deps, handler, default_handler, error_handler) + }), + }; + + tx.send(upd).await.expect("TX is dead"); } Err(err) => err_handler.clone().handle_error(err).await, } @@ -280,6 +299,92 @@ where } } +const WORKER_QUEUE_SIZE: usize = 64; + +fn spawn_worker( + deps: DependencyMap, + handler: Arc>, + default_handler: Arc, + error_handler: Arc + Send + Sync>, +) -> WorkerTx +where + Err: Send + Sync + 'static, +{ + let (tx, rx) = tokio::sync::mpsc::channel(WORKER_QUEUE_SIZE); + + let deps = Arc::new(deps); + + tokio::spawn(async move { + ReceiverStream::new(rx) + .for_each_concurrent(None, |update| { + let deps = Arc::clone(&deps); + let handler = Arc::clone(&handler); + let default_handler = Arc::clone(&default_handler); + let error_handler = Arc::clone(&error_handler); + + async move { + handle_update(update, deps, handler, default_handler, error_handler).await; + } + }) + .await; + }); + + tx +} + +fn spawn_default_worker( + deps: DependencyMap, + handler: Arc>, + default_handler: Arc, + error_handler: Arc + Send + Sync>, +) -> WorkerTx +where + Err: Send + Sync + 'static, +{ + let (tx, rx) = tokio::sync::mpsc::channel(WORKER_QUEUE_SIZE); + + let deps = Arc::new(deps); + + tokio::spawn(async move { + ReceiverStream::new(rx) + .for_each(|update| { + let deps = Arc::clone(&deps); + let handler = Arc::clone(&handler); + let default_handler = Arc::clone(&default_handler); + let error_handler = Arc::clone(&error_handler); + + async move { + handle_update(update, deps, handler, default_handler, error_handler).await; + } + }) + .await; + }); + + tx +} + +async fn handle_update( + update: Update, + deps: Arc, + handler: Arc>, + default_handler: Arc, + error_handler: Arc + Send + Sync>, +) where + Err: Send + Sync + 'static, +{ + let mut deps = deps.deref().clone(); + deps.insert(update); + + match handler.dispatch(deps).await { + ControlFlow::Break(Ok(())) => {} + ControlFlow::Break(Err(err)) => error_handler.clone().handle_error(err).await, + ControlFlow::Continue(deps) => { + let update = deps.get(); + (default_handler)(update).await; + } + } +} + #[cfg(test)] mod tests { use std::convert::Infallible; From d6521662b4677ad9854758e11d00b4a612ca34cf Mon Sep 17 00:00:00 2001 From: Hirrolot Date: Wed, 13 Apr 2022 12:07:32 +0600 Subject: [PATCH 02/10] Switch worker and default worker impls --- src/dispatching/dispatcher.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index a935d3b0..d153c908 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -316,7 +316,7 @@ where tokio::spawn(async move { ReceiverStream::new(rx) - .for_each_concurrent(None, |update| { + .for_each(|update| { let deps = Arc::clone(&deps); let handler = Arc::clone(&handler); let default_handler = Arc::clone(&default_handler); @@ -347,7 +347,7 @@ where tokio::spawn(async move { ReceiverStream::new(rx) - .for_each(|update| { + .for_each_concurrent(None, |update| { let deps = Arc::clone(&deps); let handler = Arc::clone(&handler); let default_handler = Arc::clone(&default_handler); From c2bc945d273b52aec016b50c7aa7c9fecd81723e Mon Sep 17 00:00:00 2001 From: Hirrolot Date: Wed, 13 Apr 2022 12:11:02 +0600 Subject: [PATCH 03/10] Add some helpful comments --- src/dispatching/dispatcher.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index d153c908..7574fcc8 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -100,7 +100,10 @@ pub struct Dispatcher { handler: Arc>, default_handler: Arc, + + // Tokio TX channel parts associated with chat IDs that consume updates sequentially. workers: HashMap, + // The default TX part that consume updates concurrently. default_worker: Option, error_handler: Arc + Send + Sync>, From e4459c408d1bcc91d17930aae38fba5ee268ac15 Mon Sep 17 00:00:00 2001 From: Hirrolot Date: Wed, 13 Apr 2022 13:38:05 +0600 Subject: [PATCH 04/10] Remove the redundant `Box` in `DefaultHandler` --- src/dispatching/dispatcher.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 7574fcc8..9e8a2e78 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -27,7 +27,7 @@ pub struct DispatcherBuilder { bot: R, dependencies: DependencyMap, handler: Arc>, - default_handler: Arc, + default_handler: DefaultHandler, error_handler: Arc + Send + Sync>, } @@ -48,10 +48,10 @@ where let handler = Arc::new(handler); Self { - default_handler: Arc::new(Box::new(move |upd| { + default_handler: Arc::new(move |upd| { let handler = Arc::clone(&handler); Box::pin(handler(upd)) - })), + }), ..self } } @@ -99,7 +99,7 @@ pub struct Dispatcher { dependencies: DependencyMap, handler: Arc>, - default_handler: Arc, + default_handler: DefaultHandler, // Tokio TX channel parts associated with chat IDs that consume updates sequentially. workers: HashMap, @@ -121,7 +121,7 @@ type WorkerTx = tokio::sync::mpsc::Sender; /// A handler that processes updates from Telegram. pub type UpdateHandler = dptree::Handler<'static, DependencyMap, Result<(), Err>>; -type DefaultHandler = Box) -> BoxFuture<'static, ()> + Send + Sync>; +type DefaultHandler = Arc) -> BoxFuture<'static, ()> + Send + Sync>; impl Dispatcher where @@ -138,10 +138,10 @@ where bot, dependencies: DependencyMap::new(), handler: Arc::new(handler), - default_handler: Arc::new(Box::new(|upd| { + default_handler: Arc::new(|upd| { log::warn!("Unhandled update: {:?}", upd); Box::pin(async {}) - })), + }), error_handler: LoggingErrorHandler::new(), } } @@ -307,7 +307,7 @@ const WORKER_QUEUE_SIZE: usize = 64; fn spawn_worker( deps: DependencyMap, handler: Arc>, - default_handler: Arc, + default_handler: DefaultHandler, error_handler: Arc + Send + Sync>, ) -> WorkerTx where @@ -338,7 +338,7 @@ where fn spawn_default_worker( deps: DependencyMap, handler: Arc>, - default_handler: Arc, + default_handler: DefaultHandler, error_handler: Arc + Send + Sync>, ) -> WorkerTx where @@ -370,7 +370,7 @@ async fn handle_update( update: Update, deps: Arc, handler: Arc>, - default_handler: Arc, + default_handler: DefaultHandler, error_handler: Arc + Send + Sync>, ) where Err: Send + Sync + 'static, From 6a83fa56044e157bd9af2f86aa327a5e83ba85a7 Mon Sep 17 00:00:00 2001 From: Hirrolot Date: Wed, 13 Apr 2022 13:40:27 +0600 Subject: [PATCH 05/10] Remove redundant `async move` blocks --- src/dispatching/dispatcher.rs | 40 ++++++++++++----------------------- 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 9e8a2e78..60771555 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -317,20 +317,14 @@ where let deps = Arc::new(deps); - tokio::spawn(async move { - ReceiverStream::new(rx) - .for_each(|update| { - let deps = Arc::clone(&deps); - let handler = Arc::clone(&handler); - let default_handler = Arc::clone(&default_handler); - let error_handler = Arc::clone(&error_handler); + tokio::spawn(ReceiverStream::new(rx).for_each(move |update| { + let deps = Arc::clone(&deps); + let handler = Arc::clone(&handler); + let default_handler = Arc::clone(&default_handler); + let error_handler = Arc::clone(&error_handler); - async move { - handle_update(update, deps, handler, default_handler, error_handler).await; - } - }) - .await; - }); + handle_update(update, deps, handler, default_handler, error_handler) + })); tx } @@ -348,20 +342,14 @@ where let deps = Arc::new(deps); - tokio::spawn(async move { - ReceiverStream::new(rx) - .for_each_concurrent(None, |update| { - let deps = Arc::clone(&deps); - let handler = Arc::clone(&handler); - let default_handler = Arc::clone(&default_handler); - let error_handler = Arc::clone(&error_handler); + tokio::spawn(ReceiverStream::new(rx).for_each_concurrent(None, move |update| { + let deps = Arc::clone(&deps); + let handler = Arc::clone(&handler); + let default_handler = Arc::clone(&default_handler); + let error_handler = Arc::clone(&error_handler); - async move { - handle_update(update, deps, handler, default_handler, error_handler).await; - } - }) - .await; - }); + handle_update(update, deps, handler, default_handler, error_handler) + })); tx } From 579d5a7b7ca8d8763e3c8c387a22e247a16bb488 Mon Sep 17 00:00:00 2001 From: Hirrolot Date: Wed, 13 Apr 2022 14:05:46 +0600 Subject: [PATCH 06/10] Wait for workers by `.await`ing join handles --- src/dispatching/dispatcher.rs | 36 ++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 60771555..12ff8b90 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -102,9 +102,9 @@ pub struct Dispatcher { default_handler: DefaultHandler, // Tokio TX channel parts associated with chat IDs that consume updates sequentially. - workers: HashMap, + workers: HashMap, // The default TX part that consume updates concurrently. - default_worker: Option, + default_worker: Option, error_handler: Arc + Send + Sync>, // TODO: respect allowed_udpates @@ -113,7 +113,10 @@ pub struct Dispatcher { state: ShutdownToken, } -type WorkerTx = tokio::sync::mpsc::Sender; +struct Worker { + tx: tokio::sync::mpsc::Sender, + handle: tokio::task::JoinHandle<()>, +} // TODO: it is allowed to return message as response on telegram request in // webhooks, so we can allow this too. See more there: https://core.telegram.org/bots/api#making-requests-when-getting-updates @@ -225,8 +228,15 @@ where } } - self.workers.drain(); - self.default_worker.take(); + for (_chat_id, worker) in self.workers.drain() { + drop(worker.tx); + worker.handle.await.expect("Unable to wait for a worker"); + } + if let Some(worker) = self.default_worker.take() { + drop(worker.tx); + worker.handle.await.expect("Unable to wait for a default handler"); + } + self.state.done(); } @@ -254,7 +264,7 @@ where let default_handler = Arc::clone(&self.default_handler); let error_handler = Arc::clone(&self.error_handler); - let tx = match upd.chat() { + let worker = match upd.chat() { Some(chat) => self.workers.entry(chat.id).or_insert_with(|| { spawn_worker(deps, handler, default_handler, error_handler) }), @@ -263,7 +273,7 @@ where }), }; - tx.send(upd).await.expect("TX is dead"); + worker.tx.send(upd).await.expect("TX is dead"); } Err(err) => err_handler.clone().handle_error(err).await, } @@ -309,7 +319,7 @@ fn spawn_worker( handler: Arc>, default_handler: DefaultHandler, error_handler: Arc + Send + Sync>, -) -> WorkerTx +) -> Worker where Err: Send + Sync + 'static, { @@ -317,7 +327,7 @@ where let deps = Arc::new(deps); - tokio::spawn(ReceiverStream::new(rx).for_each(move |update| { + let handle = tokio::spawn(ReceiverStream::new(rx).for_each(move |update| { let deps = Arc::clone(&deps); let handler = Arc::clone(&handler); let default_handler = Arc::clone(&default_handler); @@ -326,7 +336,7 @@ where handle_update(update, deps, handler, default_handler, error_handler) })); - tx + Worker { tx, handle } } fn spawn_default_worker( @@ -334,7 +344,7 @@ fn spawn_default_worker( handler: Arc>, default_handler: DefaultHandler, error_handler: Arc + Send + Sync>, -) -> WorkerTx +) -> Worker where Err: Send + Sync + 'static, { @@ -342,7 +352,7 @@ where let deps = Arc::new(deps); - tokio::spawn(ReceiverStream::new(rx).for_each_concurrent(None, move |update| { + let handle = tokio::spawn(ReceiverStream::new(rx).for_each_concurrent(None, move |update| { let deps = Arc::clone(&deps); let handler = Arc::clone(&handler); let default_handler = Arc::clone(&default_handler); @@ -351,7 +361,7 @@ where handle_update(update, deps, handler, default_handler, error_handler) })); - tx + Worker { tx, handle } } async fn handle_update( From 556068fdb118d5c86b3940cd35f499fbb701a254 Mon Sep 17 00:00:00 2001 From: Hirrolot Date: Wed, 13 Apr 2022 16:09:00 +0600 Subject: [PATCH 07/10] Wait for workers concurrently Co-authored-by: Waffle Maybe --- src/dispatching/dispatcher.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 12ff8b90..9ec6b2a3 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -228,14 +228,12 @@ where } } - for (_chat_id, worker) in self.workers.drain() { - drop(worker.tx); - worker.handle.await.expect("Unable to wait for a worker"); - } - if let Some(worker) = self.default_worker.take() { - drop(worker.tx); - worker.handle.await.expect("Unable to wait for a default handler"); - } + self.workers.drain() + .map(|(_chat_id, worker)| worker.handle) + .chain(self.default_worker.take().map(|worker| worker.handle)) + .collect::>() + .for_each(|()| ()) + .await; self.state.done(); } From 2e8ed1fa3c4ad1d8d9788e942c90119557128ec7 Mon Sep 17 00:00:00 2001 From: Hirrolot Date: Wed, 13 Apr 2022 16:10:41 +0600 Subject: [PATCH 08/10] Remove redundant `Arc::clone` Co-authored-by: Waffle Maybe --- src/dispatching/dispatcher.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index 9ec6b2a3..a7a20ddb 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -78,9 +78,9 @@ where Dispatcher { bot: self.bot.clone(), dependencies: self.dependencies, - handler: Arc::clone(&self.handler), - default_handler: Arc::clone(&self.default_handler), - error_handler: Arc::clone(&self.error_handler), + handler: self.handler, + default_handler: self.default_handler, + error_handler: self.error_handler, allowed_updates: Default::default(), state: ShutdownToken::new(), workers: HashMap::new(), From b2ad29702921ee52f52bae42b22dd576189202a8 Mon Sep 17 00:00:00 2001 From: Hirrolot Date: Wed, 13 Apr 2022 16:11:47 +0600 Subject: [PATCH 09/10] Fix formatting --- src/dispatching/dispatcher.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index a7a20ddb..e556e1b0 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -228,9 +228,10 @@ where } } - self.workers.drain() + self.workers + .drain() .map(|(_chat_id, worker)| worker.handle) - .chain(self.default_worker.take().map(|worker| worker.handle)) + .chain(self.default_worker.take().map(|worker| worker.handle)) .collect::>() .for_each(|()| ()) .await; From 71c647b2c99a1ae1cc810ee6578ec3d46f2669c5 Mon Sep 17 00:00:00 2001 From: Hirrolot Date: Wed, 13 Apr 2022 16:18:02 +0600 Subject: [PATCH 10/10] Fix `.for_each` on `self.workers` --- src/dispatching/dispatcher.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index e556e1b0..fd194430 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -9,7 +9,7 @@ use crate::{ }; use dptree::di::{DependencyMap, DependencySupplier}; -use futures::{future::BoxFuture, StreamExt}; +use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use std::{ collections::{HashMap, HashSet}, fmt::Debug, @@ -233,7 +233,9 @@ where .map(|(_chat_id, worker)| worker.handle) .chain(self.default_worker.take().map(|worker| worker.handle)) .collect::>() - .for_each(|()| ()) + .for_each(|res| async { + res.expect("Failed to wait for a worker."); + }) .await; self.state.done();