mirror of
https://github.com/teloxide/teloxide.git
synced 2024-10-24 09:57:18 +02:00
Add "GC" for dispatcher workers
This commit is contained in:
parent
074fd4e343
commit
9cb7ca9bd3
2 changed files with 85 additions and 13 deletions
|
@ -16,7 +16,10 @@ use std::{
|
||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
hash::Hash,
|
hash::Hash,
|
||||||
ops::{ControlFlow, Deref},
|
ops::{ControlFlow, Deref},
|
||||||
sync::Arc,
|
sync::{
|
||||||
|
atomic::{AtomicBool, Ordering},
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
use tokio::time::timeout;
|
use tokio::time::timeout;
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
|
@ -32,6 +35,7 @@ pub struct DispatcherBuilder<R, Err, Key> {
|
||||||
error_handler: Arc<dyn ErrorHandler<Err> + Send + Sync>,
|
error_handler: Arc<dyn ErrorHandler<Err> + Send + Sync>,
|
||||||
distribution_f: fn(&Update) -> Option<Key>,
|
distribution_f: fn(&Update) -> Option<Key>,
|
||||||
worker_queue_size: usize,
|
worker_queue_size: usize,
|
||||||
|
gc_worker_count_trigger: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R, Err, Key> DispatcherBuilder<R, Err, Key>
|
impl<R, Err, Key> DispatcherBuilder<R, Err, Key>
|
||||||
|
@ -83,6 +87,17 @@ where
|
||||||
Self { worker_queue_size: size, ..self }
|
Self { worker_queue_size: size, ..self }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Maximum number of inactive workers.
|
||||||
|
///
|
||||||
|
/// When number of workers exceeds this limit dispatcher will try to remove
|
||||||
|
/// inactive workers.
|
||||||
|
///
|
||||||
|
/// By default it's 32.
|
||||||
|
#[must_use]
|
||||||
|
pub fn gc_worker_count_trigger(self, count: usize) -> Self {
|
||||||
|
Self { gc_worker_count_trigger: count, ..self }
|
||||||
|
}
|
||||||
|
|
||||||
/// Specifies the distribution function that decides how updates are grouped
|
/// Specifies the distribution function that decides how updates are grouped
|
||||||
/// before execution.
|
/// before execution.
|
||||||
pub fn distribution_function<K>(
|
pub fn distribution_function<K>(
|
||||||
|
@ -100,6 +115,7 @@ where
|
||||||
error_handler,
|
error_handler,
|
||||||
distribution_f: _,
|
distribution_f: _,
|
||||||
worker_queue_size,
|
worker_queue_size,
|
||||||
|
gc_worker_count_trigger: worker_count_gc,
|
||||||
} = self;
|
} = self;
|
||||||
|
|
||||||
DispatcherBuilder {
|
DispatcherBuilder {
|
||||||
|
@ -110,6 +126,7 @@ where
|
||||||
error_handler,
|
error_handler,
|
||||||
distribution_f: f,
|
distribution_f: f,
|
||||||
worker_queue_size,
|
worker_queue_size,
|
||||||
|
gc_worker_count_trigger: worker_count_gc,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,6 +141,7 @@ where
|
||||||
error_handler,
|
error_handler,
|
||||||
distribution_f,
|
distribution_f,
|
||||||
worker_queue_size,
|
worker_queue_size,
|
||||||
|
gc_worker_count_trigger,
|
||||||
} = self;
|
} = self;
|
||||||
|
|
||||||
Dispatcher {
|
Dispatcher {
|
||||||
|
@ -137,6 +155,7 @@ where
|
||||||
worker_queue_size,
|
worker_queue_size,
|
||||||
workers: HashMap::new(),
|
workers: HashMap::new(),
|
||||||
default_worker: None,
|
default_worker: None,
|
||||||
|
gc_worker_count_trigger,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -158,6 +177,7 @@ pub struct Dispatcher<R, Err, Key> {
|
||||||
|
|
||||||
distribution_f: fn(&Update) -> Option<Key>,
|
distribution_f: fn(&Update) -> Option<Key>,
|
||||||
worker_queue_size: usize,
|
worker_queue_size: usize,
|
||||||
|
gc_worker_count_trigger: usize,
|
||||||
// Tokio TX channel parts associated with chat IDs that consume updates sequentially.
|
// Tokio TX channel parts associated with chat IDs that consume updates sequentially.
|
||||||
workers: HashMap<Key, Worker>,
|
workers: HashMap<Key, Worker>,
|
||||||
// The default TX part that consume updates concurrently.
|
// The default TX part that consume updates concurrently.
|
||||||
|
@ -171,6 +191,7 @@ pub struct Dispatcher<R, Err, Key> {
|
||||||
struct Worker {
|
struct Worker {
|
||||||
tx: tokio::sync::mpsc::Sender<Update>,
|
tx: tokio::sync::mpsc::Sender<Update>,
|
||||||
handle: tokio::task::JoinHandle<()>,
|
handle: tokio::task::JoinHandle<()>,
|
||||||
|
is_waiting: Arc<AtomicBool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: it is allowed to return message as response on telegram request in
|
// TODO: it is allowed to return message as response on telegram request in
|
||||||
|
@ -194,6 +215,7 @@ where
|
||||||
Err: Debug,
|
Err: Debug,
|
||||||
{
|
{
|
||||||
const DEFAULT_WORKER_QUEUE_SIZE: usize = 64;
|
const DEFAULT_WORKER_QUEUE_SIZE: usize = 64;
|
||||||
|
const DEFAULT_GC_WORKER_COUNT_TRIGGER: usize = 32;
|
||||||
|
|
||||||
DispatcherBuilder {
|
DispatcherBuilder {
|
||||||
bot,
|
bot,
|
||||||
|
@ -206,6 +228,7 @@ where
|
||||||
error_handler: LoggingErrorHandler::new(),
|
error_handler: LoggingErrorHandler::new(),
|
||||||
worker_queue_size: DEFAULT_WORKER_QUEUE_SIZE,
|
worker_queue_size: DEFAULT_WORKER_QUEUE_SIZE,
|
||||||
distribution_f: default_distribution_function,
|
distribution_f: default_distribution_function,
|
||||||
|
gc_worker_count_trigger: DEFAULT_GC_WORKER_COUNT_TRIGGER,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -214,7 +237,7 @@ impl<R, Err, Key> Dispatcher<R, Err, Key>
|
||||||
where
|
where
|
||||||
R: Requester + Clone + Send + Sync + 'static,
|
R: Requester + Clone + Send + Sync + 'static,
|
||||||
Err: Send + Sync + 'static,
|
Err: Send + Sync + 'static,
|
||||||
Key: Hash + Eq,
|
Key: Hash + Eq + Clone,
|
||||||
{
|
{
|
||||||
/// Starts your bot with the default parameters.
|
/// Starts your bot with the default parameters.
|
||||||
///
|
///
|
||||||
|
@ -280,6 +303,8 @@ where
|
||||||
tokio::pin!(stream);
|
tokio::pin!(stream);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
self.gc_workers_if_needed().await;
|
||||||
|
|
||||||
// False positive
|
// False positive
|
||||||
#[allow(clippy::collapsible_match)]
|
#[allow(clippy::collapsible_match)]
|
||||||
if let Ok(upd) = timeout(shutdown_check_timeout, stream.next()).await {
|
if let Ok(upd) = timeout(shutdown_check_timeout, stream.next()).await {
|
||||||
|
@ -367,6 +392,45 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn gc_workers_if_needed(&mut self) {
|
||||||
|
if self.workers.len() <= self.gc_worker_count_trigger {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.gc_workers().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline(never)]
|
||||||
|
async fn gc_workers(&mut self) {
|
||||||
|
let handles = self
|
||||||
|
.workers
|
||||||
|
.iter()
|
||||||
|
.filter(|(_, worker)| {
|
||||||
|
worker.tx.capacity() == self.worker_queue_size
|
||||||
|
&& worker.is_waiting.load(Ordering::Relaxed)
|
||||||
|
})
|
||||||
|
.map(|(k, _)| k)
|
||||||
|
.cloned()
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.into_iter()
|
||||||
|
.map(|key| {
|
||||||
|
let Worker { tx, handle, .. } = self.workers.remove(&key).unwrap();
|
||||||
|
|
||||||
|
// Close channel, worker should stop almost immediately
|
||||||
|
// (it's been supposedly waiting on the channel)
|
||||||
|
drop(tx);
|
||||||
|
|
||||||
|
handle
|
||||||
|
});
|
||||||
|
|
||||||
|
for handle in handles {
|
||||||
|
// We must wait for worker to stop anyway, even though it should stop
|
||||||
|
// immediately. This helps in case if we've checked that the worker
|
||||||
|
// is waiting in between it received the update and set the flag.
|
||||||
|
let _ = handle.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Setups the `^C` handler that [`shutdown`]s dispatching.
|
/// Setups the `^C` handler that [`shutdown`]s dispatching.
|
||||||
///
|
///
|
||||||
/// [`shutdown`]: ShutdownToken::shutdown
|
/// [`shutdown`]: ShutdownToken::shutdown
|
||||||
|
@ -410,20 +474,28 @@ fn spawn_worker<Err>(
|
||||||
where
|
where
|
||||||
Err: Send + Sync + 'static,
|
Err: Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
let (tx, rx) = tokio::sync::mpsc::channel(queue_size);
|
let (tx, mut rx) = tokio::sync::mpsc::channel(queue_size);
|
||||||
|
let is_waiting = Arc::new(AtomicBool::new(true));
|
||||||
|
let is_waiting_local = Arc::clone(&is_waiting);
|
||||||
|
|
||||||
let deps = Arc::new(deps);
|
let deps = Arc::new(deps);
|
||||||
|
|
||||||
let handle = tokio::spawn(ReceiverStream::new(rx).for_each(move |update| {
|
let handle = tokio::spawn(async move {
|
||||||
let deps = Arc::clone(&deps);
|
while let Some(update) = rx.recv().await {
|
||||||
let handler = Arc::clone(&handler);
|
is_waiting_local.store(false, Ordering::Relaxed);
|
||||||
let default_handler = Arc::clone(&default_handler);
|
|
||||||
let error_handler = Arc::clone(&error_handler);
|
|
||||||
|
|
||||||
handle_update(update, deps, handler, default_handler, error_handler)
|
let deps = Arc::clone(&deps);
|
||||||
}));
|
let handler = Arc::clone(&handler);
|
||||||
|
let default_handler = Arc::clone(&default_handler);
|
||||||
|
let error_handler = Arc::clone(&error_handler);
|
||||||
|
|
||||||
Worker { tx, handle }
|
handle_update(update, deps, handler, default_handler, error_handler).await;
|
||||||
|
|
||||||
|
is_waiting_local.store(true, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Worker { tx, handle, is_waiting }
|
||||||
}
|
}
|
||||||
|
|
||||||
fn spawn_default_worker<Err>(
|
fn spawn_default_worker<Err>(
|
||||||
|
@ -449,7 +521,7 @@ where
|
||||||
handle_update(update, deps, handler, default_handler, error_handler)
|
handle_update(update, deps, handler, default_handler, error_handler)
|
||||||
}));
|
}));
|
||||||
|
|
||||||
Worker { tx, handle }
|
Worker { tx, handle, is_waiting: Arc::new(AtomicBool::new(true)) }
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_update<Err>(
|
async fn handle_update<Err>(
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use teloxide_core::types::{ChatId, Update};
|
use teloxide_core::types::{ChatId, Update};
|
||||||
|
|
||||||
/// Default distribution key for dispatching.
|
/// Default distribution key for dispatching.
|
||||||
#[derive(Debug, Hash, PartialEq, Eq)]
|
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
|
||||||
pub struct DefaultKey(ChatId);
|
pub struct DefaultKey(ChatId);
|
||||||
|
|
||||||
pub(crate) fn default_distribution_function(update: &Update) -> Option<DefaultKey> {
|
pub(crate) fn default_distribution_function(update: &Update) -> Option<DefaultKey> {
|
||||||
|
|
Loading…
Reference in a new issue