Many improvements of Dispatcher, also made many dispatcher::* methods and types pub(crate) for using in dispatching2 module

This commit is contained in:
p0lunin 2021-11-25 12:59:54 +02:00
parent 887b24d27f
commit 14083c8300
6 changed files with 261 additions and 99 deletions

View file

@ -75,7 +75,7 @@ teloxide-macros = { version = "0.4", optional = true }
serde_json = "1.0" serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
dptree = { git = "https://github.com/p0lunin/dptree", branch = "kiss3" } dptree = { git = "https://github.com/p0lunin/dptree" }
tokio = { version = "1.8", features = ["fs"] } tokio = { version = "1.8", features = ["fs"] }
tokio-util = "0.6" tokio-util = "0.6"

View file

@ -544,8 +544,8 @@ impl std::error::Error for IdleShutdownError {}
/// A token which used to shutdown [`Dispatcher`]. /// A token which used to shutdown [`Dispatcher`].
#[derive(Clone)] #[derive(Clone)]
pub struct ShutdownToken { pub struct ShutdownToken {
dispatcher_state: Arc<DispatcherState>, pub(crate) dispatcher_state: Arc<DispatcherState>,
shutdown_notify_back: Arc<Notify>, pub(crate) shutdown_notify_back: Arc<Notify>,
} }
impl ShutdownToken { impl ShutdownToken {
@ -566,20 +566,20 @@ impl ShutdownToken {
} }
} }
struct DispatcherState { pub(crate) struct DispatcherState {
inner: AtomicU8, inner: AtomicU8,
} }
impl DispatcherState { impl DispatcherState {
fn load(&self) -> ShutdownState { pub(crate) fn load(&self) -> ShutdownState {
ShutdownState::from_u8(self.inner.load(Ordering::SeqCst)) ShutdownState::from_u8(self.inner.load(Ordering::SeqCst))
} }
fn store(&self, new: ShutdownState) { pub(crate) fn store(&self, new: ShutdownState) {
self.inner.store(new as _, Ordering::SeqCst) self.inner.store(new as _, Ordering::SeqCst)
} }
fn compare_exchange( pub(crate) fn compare_exchange(
&self, &self,
current: ShutdownState, current: ShutdownState,
new: ShutdownState, new: ShutdownState,
@ -599,7 +599,7 @@ impl Default for DispatcherState {
#[repr(u8)] #[repr(u8)]
#[derive(Debug)] #[derive(Debug)]
enum ShutdownState { pub(crate) enum ShutdownState {
Running, Running,
ShuttingDown, ShuttingDown,
Idle, Idle,
@ -620,21 +620,17 @@ impl ShutdownState {
} }
} }
fn shutdown_check_timeout_for<E>(update_listener: &impl UpdateListener<E>) -> Duration { pub(crate) fn shutdown_check_timeout_for<E>(update_listener: &impl UpdateListener<E>) -> Duration {
const MIN_SHUTDOWN_CHECK_TIMEOUT: Duration = Duration::from_secs(1); const MIN_SHUTDOWN_CHECK_TIMEOUT: Duration = Duration::from_secs(1);
const DZERO: Duration = Duration::ZERO;
// FIXME: replace this by just Duration::ZERO once 1.53 will be released
const DZERO: Duration = Duration::from_secs(0);
let shutdown_check_timeout = update_listener.timeout_hint().unwrap_or(DZERO); let shutdown_check_timeout = update_listener.timeout_hint().unwrap_or(DZERO);
shutdown_check_timeout.saturating_add(MIN_SHUTDOWN_CHECK_TIMEOUT)
// FIXME: replace this by just saturating_add once 1.53 will be released
shutdown_check_timeout.checked_add(MIN_SHUTDOWN_CHECK_TIMEOUT).unwrap_or(shutdown_check_timeout)
} }
struct AlreadyShuttingDown; pub(crate) struct AlreadyShuttingDown;
fn shutdown_inner( pub(crate) fn shutdown_inner(
state: &DispatcherState, state: &DispatcherState,
) -> Result<(), Result<AlreadyShuttingDown, IdleShutdownError>> { ) -> Result<(), Result<AlreadyShuttingDown, IdleShutdownError>> {
use ShutdownState::*; use ShutdownState::*;

View file

@ -62,6 +62,10 @@ pub use dispatcher_handler_rx_ext::DispatcherHandlerRxExt;
use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::mpsc::UnboundedReceiver;
pub use update_with_cx::{UpdateWithCx, UpdateWithCxRequesterType}; pub use update_with_cx::{UpdateWithCx, UpdateWithCxRequesterType};
pub(crate) use dispatcher::{
shutdown_check_timeout_for, shutdown_inner, DispatcherState, ShutdownState,
};
/// A type of a stream, consumed by [`Dispatcher`]'s handlers. /// A type of a stream, consumed by [`Dispatcher`]'s handlers.
/// ///
/// [`Dispatcher`]: crate::dispatching::Dispatcher /// [`Dispatcher`]: crate::dispatching::Dispatcher

View file

@ -1,44 +1,264 @@
use crate::{ use crate::{
dispatching2::handlers::{Handlers, Replaced, UpdateHandler}, dispatching::{
types, shutdown_check_timeout_for, shutdown_inner, stop_token::StopToken, update_listeners,
types::Update, update_listeners::UpdateListener, DispatcherState, ShutdownToken,
},
error_handlers::{ErrorHandler, LoggingErrorHandler},
requests::Requester,
types::{AllowedUpdate, Update, UpdateKind},
}; };
use dptree::{di::DependencySupplier, Replace}; use dptree::di::DependencyMap;
use futures::StreamExt;
use std::{collections::HashSet, convert::Infallible, fmt::Debug, ops::ControlFlow, sync::Arc};
use tokio::{sync::Notify, time::timeout};
pub struct Dispatcher<C, Err> pub struct Dispatcher<R, Err> {
where requester: R,
C: Replace<Update, types::Message>, dependencies: DependencyMap,
{
handlers: Handlers<C, Err>, handler: UpdateHandler<Err>,
default_handler: DefaultHandler,
allowed_updates: HashSet<AllowedUpdate>,
state: Arc<DispatcherState>,
shutdown_notify_back: Arc<Notify>,
} }
impl<C, IR, Err> Dispatcher<C, Err> // TODO: it is allowed to return message as response on telegram request in
// webhooks, so we can allow this too. See more there: https://core.telegram.org/bots/api#making-requests-when-getting-updates
// FIXME: remove 'static lifetime?
pub type UpdateHandler<Err> = dptree::Handler<'static, DependencyMap, Result<(), Err>>;
pub type DefaultHandler = dptree::Handler<'static, DependencyMap, (), Infallible>;
macro_rules! make_parser {
($kind:ident) => {
dptree::parser(|upd: &Update| match &upd.kind {
UpdateKind::$kind(u) => Some(u.clone()),
_ => None,
})
};
}
impl<R, Err> Dispatcher<R, Err>
where where
C: DependencySupplier<Update> R: 'static,
+ Send
+ Sync
+ 'static
+ Replace<Update, types::Message, Out = IR>,
IR: Send + Sync + 'static + Replace<types::Message, Update, Out = C>,
Err: Send + Sync + 'static, Err: Send + Sync + 'static,
{ {
pub fn new() -> Self { pub fn new(requester: R) -> Self {
Dispatcher { handlers: Handlers::new() } Dispatcher {
requester,
dependencies: DependencyMap::new(),
handler: dptree::entry(),
default_handler: dptree::endpoint(|update: Arc<Update>| async move {
log::warn!("Unhandled update: {:?}", update.as_ref())
}),
allowed_updates: Default::default(),
state: Arc::new(Default::default()),
shutdown_notify_back: Arc::new(Default::default()),
}
}
/// Setup the `^C` handler which [`shutdown`]s dispatching.
///
/// [`shutdown`]: ShutdownToken::shutdown
#[cfg(feature = "ctrlc_handler")]
#[cfg_attr(docsrs, doc(cfg(feature = "ctrlc_handler")))]
pub fn setup_ctrlc_handler(self) -> Self {
let state = Arc::clone(&self.state);
tokio::spawn(async move {
loop {
tokio::signal::ctrl_c().await.expect("Failed to listen for ^C");
match shutdown_inner(&state) {
Ok(()) => log::info!("^C received, trying to shutdown the dispatcher..."),
Err(Ok(_)) => {
log::info!(
"^C received, the dispatcher is already shutting down, ignoring the \
signal"
)
}
Err(Err(_)) => {
log::info!("^C received, the dispatcher isn't running, ignoring the signal")
}
}
}
});
self
}
/// Returns a shutdown token, which can later be used to shutdown
/// dispatching.
pub fn shutdown_token(&self) -> ShutdownToken {
ShutdownToken {
dispatcher_state: Arc::clone(&self.state),
shutdown_notify_back: Arc::clone(&self.shutdown_notify_back),
}
}
/// Starts your bot with the default parameters.
///
/// The default parameters are a long polling update listener and log all
/// errors produced by this listener).
///
/// Please note that after shutting down (either because of [`shutdown`],
/// [a ctrlc signal], or [`UpdateListener`] returning `None`) all handlers
/// will be gone. As such, to restart listening you need to re-add
/// handlers.
///
/// [`shutdown`]: ShutdownToken::shutdown
/// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler
pub async fn dispatch(&mut self)
where
R: Requester + Clone,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
let listener = update_listeners::polling_default(self.requester.clone()).await;
let error_handler =
LoggingErrorHandler::with_custom_text("An error from the update listener");
self.dispatch_with_listener(listener, error_handler).await;
}
/// Starts your bot with custom `update_listener` and
/// `update_listener_error_handler`.
///
/// Please note that after shutting down (either because of [`shutdown`],
/// [a ctrlc signal], or [`UpdateListener`] returning `None`) all handlers
/// will be gone. As such, to restart listening you need to re-add
/// handlers.
///
/// [`shutdown`]: ShutdownToken::shutdown
/// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler
pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>(
&'a mut self,
mut update_listener: UListener,
update_listener_error_handler: Arc<Eh>,
) where
UListener: UpdateListener<ListenerE> + 'a,
Eh: ErrorHandler<ListenerE> + 'a,
ListenerE: Debug,
R: Requester + Clone,
{
use crate::dispatching::ShutdownState::*;
update_listener.hint_allowed_updates(&mut self.allowed_updates.clone().into_iter());
let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener);
let mut stop_token = Some(update_listener.stop_token());
if let Err(actual) = self.state.compare_exchange(Idle, Running) {
unreachable!(
"Dispatching is already running: expected `{:?}` state, found `{:?}`",
Idle, actual
);
}
{
let stream = update_listener.as_stream();
tokio::pin!(stream);
loop {
// False positive
#[allow(clippy::collapsible_match)]
if let Ok(upd) = timeout(shutdown_check_timeout, stream.next()).await {
match upd {
None => break,
Some(upd) => self.process_update(upd, &update_listener_error_handler).await,
}
}
if let ShuttingDown = self.state.load() {
if let Some(token) = stop_token.take() {
log::debug!("Start shutting down dispatching...");
token.stop();
break;
}
}
}
}
if let ShuttingDown = self.state.load() {
// Stopped because of a `shutdown` call.
// Notify `shutdown`s that we finished
self.shutdown_notify_back.notify_waiters();
log::info!("Dispatching has been shut down.");
} else {
log::info!("Dispatching has been stopped (listener returned `None`).");
}
self.state.store(Idle);
}
async fn process_update<LErr, LErrHandler>(
&self,
update: Result<Update, LErr>,
err_handler: &Arc<LErrHandler>,
) where
LErrHandler: ErrorHandler<LErr>,
{
match update {
Ok(upd) => {
let mut deps = self.dependencies.clone();
deps.insert(upd);
match self.handler.dispatch(deps).await {
ControlFlow::Break(Ok(())) => {}
ControlFlow::Break(Err(_err)) => todo!("error handler"),
ControlFlow::Continue(deps) => {
match self
.default_handler
.clone()
.execute(deps, |next| async move { match next {} })
.await
{
ControlFlow::Break(()) => {}
ControlFlow::Continue(_) => unreachable!(
"This is unreachable due to Infallible type in the DefaultHandler \
type"
),
}
}
}
}
Err(err) => err_handler.clone().handle_error(err).await,
}
}
pub fn handler(self, handler: UpdateHandler<Err>) -> Self {
Dispatcher { handler: self.handler.branch(handler), ..self }
}
// Specify handler that will be called if other handlers was not handle the
// update.
pub fn default_handler(self, handler: UpdateHandler<Err>) -> Self {
Dispatcher { handler: self.handler.branch(handler), ..self }
}
// Specify dependencies that can be used inside of handlers.
pub fn dependencies(self, dependencies: DependencyMap) -> Self {
Dispatcher { dependencies, ..self }
} }
pub fn message_handler( pub fn message_handler(
mut self, mut self,
handler: UpdateHandler<Replaced<C, types::Message>, Err>, make_handler: impl FnOnce(UpdateHandler<Err>) -> UpdateHandler<Err>,
) -> Self { ) -> Self {
self.handlers.message_handler(handler); self.allowed_updates.insert(AllowedUpdate::Message);
self
let parser = make_parser!(Message);
let handler = make_handler(parser);
self.handler(handler)
} }
pub fn edited_message_handler( pub fn edited_message_handler(
mut self, mut self,
handler: UpdateHandler<Replaced<C, types::Message>, Err>, make_handler: impl FnOnce(UpdateHandler<Err>) -> UpdateHandler<Err>,
) -> Self { ) -> Self {
self.handlers.edited_message_handler(handler); self.allowed_updates.insert(AllowedUpdate::EditedMessage);
self
let parser = make_parser!(EditedMessage);
let handler = make_handler(parser);
self.handler(handler)
} }
} }

View file

@ -1,57 +0,0 @@
use crate::{
types,
types::{Update, UpdateKind},
};
use dptree::{di::DependencySupplier, Handler, Replace};
pub type Replaced<C, T2> = <C as Replace<Update, T2>>::Out;
pub struct Handlers<C, Err>
where
C: Replace<Update, types::Message>,
{
message_handler: UpdateHandler<C, Err, Replaced<C, types::Message>>,
edited_message_handler: UpdateHandler<C, Err, Replaced<C, types::Message>>,
}
macro_rules! new_handler {
($kind:ident) => {
dptree::parser(|upd: &Update| match &upd.kind {
UpdateKind::$kind(u) => Some(u.clone()),
_ => None,
})
};
}
impl<C, IR, Err> Handlers<C, Err>
where
C: DependencySupplier<Update>
+ Send
+ Sync
+ 'static
+ Replace<Update, types::Message, Out = IR>,
IR: Send + Sync + 'static + Replace<types::Message, Update, Out = C>,
Err: Send + Sync + 'static,
{
pub fn new() -> Self {
Handlers {
message_handler: new_handler!(Message),
edited_message_handler: new_handler!(EditedMessage),
}
}
pub fn message_handler(&mut self, handler: UpdateHandler<Replaced<C, types::Message>, Err>) {
self.message_handler = self.message_handler.clone().branch(handler);
}
pub fn edited_message_handler(
&mut self,
handler: UpdateHandler<Replaced<C, types::Message>, Err>,
) {
self.edited_message_handler = self.edited_message_handler.clone().branch(handler);
}
}
// TODO: it is allowed to return message as answer on telegram request in
// webhooks, so we can allow this too. See more there: https://core.telegram.org/bots/api#making-requests-when-getting-updates
pub type UpdateHandler<C, Err, IR = C> = Handler<'static, C, Result<(), Err>, IR>;

View file

@ -1,4 +1,3 @@
mod dispatcher; mod dispatcher;
mod handlers;
pub use dispatcher::Dispatcher; pub use dispatcher::Dispatcher;