Merge pull request #385 from teloxide/graceful_shutdown

Graceful shutdown & co
This commit is contained in:
Hirrolot 2021-06-27 16:41:44 +06:00 committed by GitHub
commit 3441ef46c0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 896 additions and 296 deletions

View file

@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `Storage::get_dialogue` to obtain a dialogue indexed by a chat ID.
- `InMemStorageError` with a single variant `DialogueNotFound` to be returned from `InMemStorage::remove_dialogue`.
- `RedisStorageError::DialogueNotFound` and `SqliteStorageError::DialogueNotFound` to be returned from `Storage::remove_dialogue`.
- A way to `shutdown` dispatcher
- `Dispatcher::shutdown_token` function.
- `ShutdownToken` with a `shutdown` function.
- `Dispatcher::setup_ctrlc_handler` function ([issue 153](https://github.com/teloxide/teloxide/issues/153)).
- `IdleShutdownError`
### Changed
@ -18,6 +23,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Return an error from `Storage::remove_dialogue` if a dialogue does not exist.
- Require `D: Clone` in `dialogues_repl(_with_listener)` and `InMemStorage`.
- Automatically delete a webhook if it was set up in `update_listeners::polling_default` (thereby making it `async`, [issue 319](https://github.com/teloxide/teloxide/issues/319)).
- `polling` and `polling_default` now require `R: 'static`
- Refactor `UpdateListener` trait:
- Add a `StopToken` associated type.
- It must implement a new `StopToken` trait which has the only function `fn stop(self);`
- Add a `stop_token` function that returns `Self::StopToken` and allows stopping the listener later ([issue 166](https://github.com/teloxide/teloxide/issues/166)).
- Remove blanked implementation.
- Remove `Stream` from super traits.
- Add `AsUpdateStream` to super traits.
- Add an `AsUpdateStream` trait that allows turning implementors into streams of updates (GAT workaround).
- Add a `timeout_hint` function (with a default implementation).
- `Dispatcher::dispatch` and `Dispatcher::dispatch_with_listener` now require mutable reference to self.
- Repls can now be stopped by `^C` signal.
- `Noop` and `AsyncStopToken`stop tokens.
- `StatefulListener`.
- Emit not only errors but also warnings and general information from teloxide, when set up by `enable_logging!`.
### Fixed

View file

@ -24,7 +24,7 @@ authors = [
maintenance = { status = "actively-developed" }
[features]
default = ["native-tls", "teloxide-core/default"]
default = ["native-tls", "ctrlc_handler", "teloxide-core/default"]
sqlite-storage = ["sqlx"]
redis-storage = ["redis"]
@ -34,6 +34,8 @@ bincode-serializer = ["bincode"]
frunk- = ["frunk"]
macros = ["teloxide-macros"]
ctrlc_handler = ["tokio/signal"]
native-tls = ["teloxide-core/native-tls"]
rustls = ["teloxide-core/rustls"]
auto-send = ["teloxide-core/auto_send"]
@ -51,6 +53,7 @@ full = [
"bincode-serializer",
"frunk",
"macros",
"ctrlc_handler",
"teloxide-core/full",
"native-tls",
"rustls",

View file

@ -1,7 +1,7 @@
// The version of Heroku ping-pong-bot, which uses a webhook to receive updates
// from Telegram, instead of long polling.
use teloxide::{dispatching::update_listeners, prelude::*, types::Update};
use teloxide::{dispatching::{update_listeners::{self, StatefulListener}, stop_token::AsyncStopToken}, prelude::*, types::Update};
use std::{convert::Infallible, env, net::SocketAddr};
use tokio::sync::mpsc;
@ -20,8 +20,8 @@ async fn handle_rejection(error: warp::Rejection) -> Result<impl warp::Reply, In
Ok(StatusCode::INTERNAL_SERVER_ERROR)
}
pub async fn webhook<'a>(bot: AutoSend<Bot>) -> impl update_listeners::UpdateListener<Infallible> {
// Heroku defines auto defines a port value
pub async fn webhook(bot: AutoSend<Bot>) -> impl update_listeners::UpdateListener<Infallible> {
// Heroku auto defines a port value
let teloxide_token = env::var("TELOXIDE_TOKEN").expect("TELOXIDE_TOKEN env variable missing");
let port: u16 = env::var("PORT")
.expect("PORT env variable missing")
@ -48,11 +48,21 @@ pub async fn webhook<'a>(bot: AutoSend<Bot>) -> impl update_listeners::UpdateLis
})
.recover(handle_rejection);
let serve = warp::serve(server);
let (stop_token, stop_flag) = AsyncStopToken::new_pair();
let address = format!("0.0.0.0:{}", port);
tokio::spawn(serve.run(address.parse::<SocketAddr>().unwrap()));
UnboundedReceiverStream::new(rx)
let addr = format!("0.0.0.0:{}", port).parse::<SocketAddr>().unwrap();
let server = warp::serve(server);
let (_addr, fut) = server.bind_with_graceful_shutdown(addr, stop_flag);
// You might want to use serve.key_path/serve.cert_path methods here to
// setup a self-signed TLS certificate.
tokio::spawn(fut);
let stream = UnboundedReceiverStream::new(rx);
fn streamf<S, T>(state: &mut (S, T)) -> &mut S { &mut state.0 }
StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| state.1.clone())
}
async fn run() {

View file

@ -1,7 +1,7 @@
// The version of ngrok ping-pong-bot, which uses a webhook to receive updates
// from Telegram, instead of long polling.
use teloxide::{dispatching::update_listeners, prelude::*, types::Update};
use teloxide::{dispatching::{update_listeners::{self, StatefulListener}, stop_token::AsyncStopToken}, prelude::*, types::Update};
use std::{convert::Infallible, net::SocketAddr};
use tokio::sync::mpsc;
@ -20,10 +20,10 @@ async fn handle_rejection(error: warp::Rejection) -> Result<impl warp::Reply, In
Ok(StatusCode::INTERNAL_SERVER_ERROR)
}
pub async fn webhook<'a>(bot: AutoSend<Bot>) -> impl update_listeners::UpdateListener<Infallible> {
pub async fn webhook(bot: AutoSend<Bot>) -> impl update_listeners::UpdateListener<Infallible> {
// You might want to specify a self-signed certificate via .certificate
// method on SetWebhook.
bot.set_webhook("Your HTTPS ngrok URL here. Get it by 'ngrok http 80'")
bot.set_webhook("Your HTTPS ngrok URL here. Get it by `ngrok http 80`")
.await
.expect("Cannot setup a webhook");
@ -40,13 +40,21 @@ pub async fn webhook<'a>(bot: AutoSend<Bot>) -> impl update_listeners::UpdateLis
})
.recover(handle_rejection);
let serve = warp::serve(server);
let (stop_token, stop_flag) = AsyncStopToken::new_pair();
let addr = "127.0.0.1:80".parse::<SocketAddr>().unwrap();
let server = warp::serve(server);
let (_addr, fut) = server.bind_with_graceful_shutdown(addr, stop_flag);
// You might want to use serve.key_path/serve.cert_path methods here to
// setup a self-signed TLS certificate.
tokio::spawn(serve.run("127.0.0.1:80".parse::<SocketAddr>().unwrap()));
UnboundedReceiverStream::new(rx)
tokio::spawn(fut);
let stream = UnboundedReceiverStream::new(rx);
fn streamf<S, T>(state: &mut (S, T)) -> &mut S { &mut state.0 }
StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| state.1.clone())
}
async fn run() {

View file

@ -1,48 +1,37 @@
use std::{
fmt::{self, Debug},
sync::{
atomic::{AtomicU8, Ordering},
Arc,
},
time::Duration,
};
use crate::{
dispatching::{
update_listeners, update_listeners::UpdateListener, DispatcherHandler, UpdateWithCx,
stop_token::StopToken,
update_listeners::{self, UpdateListener},
DispatcherHandler, UpdateWithCx,
},
error_handlers::{ErrorHandler, LoggingErrorHandler},
};
use futures::StreamExt;
use std::{fmt::Debug, sync::Arc};
use futures::{stream::FuturesUnordered, Future, StreamExt};
use teloxide_core::{
requests::Requester,
types::{
CallbackQuery, ChatMemberUpdated, ChosenInlineResult, InlineQuery, Message, Poll,
PollAnswer, PreCheckoutQuery, ShippingQuery, UpdateKind,
PollAnswer, PreCheckoutQuery, ShippingQuery, Update, UpdateKind,
},
};
use tokio::sync::mpsc;
use tokio::{
sync::{mpsc, Notify},
task::JoinHandle,
time::timeout,
};
type Tx<Upd, R> = Option<mpsc::UnboundedSender<UpdateWithCx<Upd, R>>>;
#[macro_use]
mod macros {
/// Pushes an update to a queue.
macro_rules! send {
($requester:expr, $tx:expr, $update:expr, $variant:expr) => {
send($requester, $tx, $update, stringify!($variant));
};
}
}
fn send<'a, R, Upd>(requester: &'a R, tx: &'a Tx<R, Upd>, update: Upd, variant: &'static str)
where
Upd: Debug,
R: Requester + Clone,
{
if let Some(tx) = tx {
if let Err(error) = tx.send(UpdateWithCx { requester: requester.clone(), update }) {
log::error!(
"The RX part of the {} channel is closed, but an update is received.\nError:{}\n",
variant,
error
);
}
}
}
/// One dispatcher to rule them all.
///
/// See the [module-level documentation](crate::dispatching) for the design
@ -63,6 +52,11 @@ pub struct Dispatcher<R> {
poll_answers_queue: Tx<R, PollAnswer>,
my_chat_members_queue: Tx<R, ChatMemberUpdated>,
chat_members_queue: Tx<R, ChatMemberUpdated>,
running_handlers: FuturesUnordered<JoinHandle<()>>,
state: Arc<DispatcherState>,
shutdown_notify_back: Arc<Notify>,
}
impl<R> Dispatcher<R>
@ -87,25 +81,48 @@ where
poll_answers_queue: None,
my_chat_members_queue: None,
chat_members_queue: None,
running_handlers: FuturesUnordered::new(),
state: <_>::default(),
shutdown_notify_back: <_>::default(),
}
}
#[must_use]
#[allow(clippy::unnecessary_wraps)]
fn new_tx<H, Upd>(&self, h: H) -> Tx<R, Upd>
fn new_tx<H, Upd>(&mut self, h: H) -> Tx<R, Upd>
where
H: DispatcherHandler<R, Upd> + Send + 'static,
Upd: Send + 'static,
R: Send + 'static,
{
let (tx, rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
let fut = h.handle(rx);
fut.await;
});
let join_handle = tokio::spawn(h.handle(rx));
self.running_handlers.push(join_handle);
Some(tx)
}
/// Setup the `^C` handler which [`shutdown`]s dispatching.
///
/// [`shutdown`]: ShutdownToken::shutdown
#[cfg(feature = "ctrlc_handler")]
#[cfg_attr(docsrs, doc(cfg(feature = "ctrlc_handler")))]
pub fn setup_ctrlc_handler(self) -> Self {
let state = Arc::clone(&self.state);
tokio::spawn(async move {
loop {
tokio::signal::ctrl_c().await.expect("Failed to listen for ^C");
log::info!("^C received, trying to shutdown the dispatcher...");
// If dispatcher wasn't running, then there is nothing to do
shutdown_inner(&state).ok();
}
});
self
}
#[must_use]
pub fn messages_handler<H>(mut self, h: H) -> Self
where
@ -227,23 +244,39 @@ where
///
/// The default parameters are a long polling update listener and log all
/// errors produced by this listener).
pub async fn dispatch(&self)
///
/// Please note that after shutting down (either because of [`shutdown`],
/// [a ctrlc signal], or [`UpdateListener`] returning `None`) all handlers
/// will be gone. As such, to restart listening you need to re-add
/// handlers.
///
/// [`shutdown`]: ShutdownToken::shutdown
/// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler
pub async fn dispatch(&mut self)
where
R: Requester + Clone,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
self.dispatch_with_listener(
update_listeners::polling_default(self.requester.clone()).await,
LoggingErrorHandler::with_custom_text("An error from the update listener"),
)
.await;
let listener = update_listeners::polling_default(self.requester.clone()).await;
let error_handler =
LoggingErrorHandler::with_custom_text("An error from the update listener");
self.dispatch_with_listener(listener, error_handler).await;
}
/// Starts your bot with custom `update_listener` and
/// `update_listener_error_handler`.
///
/// Please note that after shutting down (either because of [`shutdown`],
/// [a ctrlc signal], or [`UpdateListener`] returning `None`) all handlers
/// will be gone. As such, to restart listening you need to re-add
/// handlers.
///
/// [`shutdown`]: ShutdownToken::shutdown
/// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler
pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>(
&'a self,
update_listener: UListener,
&'a mut self,
mut update_listener: UListener,
update_listener_error_handler: Arc<Eh>,
) where
UListener: UpdateListener<ListenerE> + 'a,
@ -251,126 +284,308 @@ where
ListenerE: Debug,
R: Requester + Clone,
{
let update_listener = Box::pin(update_listener);
use ShutdownState::*;
update_listener
.for_each(move |update| {
let update_listener_error_handler = Arc::clone(&update_listener_error_handler);
let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener);
let mut stop_token = Some(update_listener.stop_token());
async move {
if let Err(actual) = self.state.compare_exchange(Idle, Running) {
unreachable!(
"Dispatching is already running: expected `{:?}` state, found `{:?}`",
Idle, actual
);
}
{
let stream = update_listener.as_stream();
tokio::pin!(stream);
loop {
if let Ok(upd) = timeout(shutdown_check_timeout, stream.next()).await {
match upd {
None => break,
Some(upd) => self.process_update(upd, &update_listener_error_handler).await,
}
}
if let ShuttingDown = self.state.load() {
if let Some(token) = stop_token.take() {
log::debug!("Start shutting down dispatching...");
token.stop();
}
}
}
}
self.wait_for_handlers().await;
if let ShuttingDown = self.state.load() {
// Stopped because of a `shutdown` call.
// Notify `shutdown`s that we finished
self.shutdown_notify_back.notify_waiters();
log::info!("Dispatching has been shut down.");
} else {
log::info!("Dispatching has been stopped (listener returned `None`).");
}
self.state.store(Idle);
}
/// Returns a shutdown token, which can later be used to shutdown
/// dispatching.
pub fn shutdown_token(&self) -> ShutdownToken {
ShutdownToken {
dispatcher_state: Arc::clone(&self.state),
shutdown_notify_back: Arc::clone(&self.shutdown_notify_back),
}
}
async fn process_update<ListenerE, Eh>(
&self,
update: Result<Update, ListenerE>,
update_listener_error_handler: &Arc<Eh>,
) where
R: Requester + Clone,
Eh: ErrorHandler<ListenerE>,
ListenerE: Debug,
{
{
log::trace!("Dispatcher received an update: {:?}", update);
let update = match update {
Ok(update) => update,
Err(error) => {
Arc::clone(&update_listener_error_handler).handle_error(error).await;
Arc::clone(update_listener_error_handler).handle_error(error).await;
return;
}
};
match update.kind {
UpdateKind::Message(message) => {
send!(
&self.requester,
&self.messages_queue,
message,
UpdateKind::Message
);
send(&self.requester, &self.messages_queue, message, "UpdateKind::Message")
}
UpdateKind::EditedMessage(message) => {
send!(
UpdateKind::EditedMessage(message) => send(
&self.requester,
&self.edited_messages_queue,
message,
UpdateKind::EditedMessage
);
}
UpdateKind::ChannelPost(post) => {
send!(
"UpdateKind::EditedMessage",
),
UpdateKind::ChannelPost(post) => send(
&self.requester,
&self.channel_posts_queue,
post,
UpdateKind::ChannelPost
);
}
UpdateKind::EditedChannelPost(post) => {
send!(
"UpdateKind::ChannelPost",
),
UpdateKind::EditedChannelPost(post) => send(
&self.requester,
&self.edited_channel_posts_queue,
post,
UpdateKind::EditedChannelPost
);
}
UpdateKind::InlineQuery(query) => {
send!(
"UpdateKind::EditedChannelPost",
),
UpdateKind::InlineQuery(query) => send(
&self.requester,
&self.inline_queries_queue,
query,
UpdateKind::InlineQuery
);
}
UpdateKind::ChosenInlineResult(result) => {
send!(
"UpdateKind::InlineQuery",
),
UpdateKind::ChosenInlineResult(result) => send(
&self.requester,
&self.chosen_inline_results_queue,
result,
UpdateKind::ChosenInlineResult
);
}
UpdateKind::CallbackQuery(query) => {
send!(
"UpdateKind::ChosenInlineResult",
),
UpdateKind::CallbackQuery(query) => send(
&self.requester,
&self.callback_queries_queue,
query,
UpdateKind::CallbackQuer
);
}
UpdateKind::ShippingQuery(query) => {
send!(
"UpdateKind::CallbackQuer",
),
UpdateKind::ShippingQuery(query) => send(
&self.requester,
&self.shipping_queries_queue,
query,
UpdateKind::ShippingQuery
);
}
UpdateKind::PreCheckoutQuery(query) => {
send!(
"UpdateKind::ShippingQuery",
),
UpdateKind::PreCheckoutQuery(query) => send(
&self.requester,
&self.pre_checkout_queries_queue,
query,
UpdateKind::PreCheckoutQuery
);
}
"UpdateKind::PreCheckoutQuery",
),
UpdateKind::Poll(poll) => {
send!(&self.requester, &self.polls_queue, poll, UpdateKind::Poll);
send(&self.requester, &self.polls_queue, poll, "UpdateKind::Poll")
}
UpdateKind::PollAnswer(answer) => {
send!(
UpdateKind::PollAnswer(answer) => send(
&self.requester,
&self.poll_answers_queue,
answer,
UpdateKind::PollAnswer
);
}
UpdateKind::MyChatMember(chat_member_updated) => {
send!(
"UpdateKind::PollAnswer",
),
UpdateKind::MyChatMember(chat_member_updated) => send(
&self.requester,
&self.my_chat_members_queue,
chat_member_updated,
UpdateKind::MyChatMember
);
}
UpdateKind::ChatMember(chat_member_updated) => {
send!(
"UpdateKind::MyChatMember",
),
UpdateKind::ChatMember(chat_member_updated) => send(
&self.requester,
&self.chat_members_queue,
chat_member_updated,
UpdateKind::MyChatMember
"UpdateKind::MyChatMember",
),
}
}
}
async fn wait_for_handlers(&mut self) {
log::debug!("Waiting for handlers to finish");
// Drop all senders, so handlers can stop
self.messages_queue.take();
self.edited_messages_queue.take();
self.channel_posts_queue.take();
self.edited_channel_posts_queue.take();
self.inline_queries_queue.take();
self.chosen_inline_results_queue.take();
self.callback_queries_queue.take();
self.shipping_queries_queue.take();
self.pre_checkout_queries_queue.take();
self.polls_queue.take();
self.poll_answers_queue.take();
self.my_chat_members_queue.take();
self.chat_members_queue.take();
// Wait untill all handlers finish
self.running_handlers.by_ref().for_each(|_| async {}).await;
}
}
/// This error is returned from [`ShutdownToken::shutdown`] when trying to
/// shutdown an idle [`Dispatcher`].
#[derive(Debug)]
pub struct IdleShutdownError;
impl fmt::Display for IdleShutdownError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Dispatcher was idle and as such couldn't be shut down")
}
}
impl std::error::Error for IdleShutdownError {}
/// A token which used to shutdown [`Dispatcher`].
#[derive(Clone)]
pub struct ShutdownToken {
dispatcher_state: Arc<DispatcherState>,
shutdown_notify_back: Arc<Notify>,
}
impl ShutdownToken {
/// Tries to shutdown dispatching.
///
/// Returns an error if the dispatcher is idle at the moment.
///
/// If you don't need to wait for shutdown, the returned future can be
/// ignored.
pub fn shutdown(&self) -> Result<impl Future<Output = ()> + '_, IdleShutdownError> {
shutdown_inner(&self.dispatcher_state).map(|()| async move {
log::info!("Trying to shutdown the dispatcher...");
self.shutdown_notify_back.notified().await
})
}
}
struct DispatcherState {
inner: AtomicU8,
}
impl DispatcherState {
fn load(&self) -> ShutdownState {
ShutdownState::from_u8(self.inner.load(Ordering::SeqCst))
}
fn store(&self, new: ShutdownState) {
self.inner.store(new as _, Ordering::SeqCst)
}
fn compare_exchange(
&self,
current: ShutdownState,
new: ShutdownState,
) -> Result<ShutdownState, ShutdownState> {
self.inner
.compare_exchange(current as _, new as _, Ordering::SeqCst, Ordering::SeqCst)
.map(ShutdownState::from_u8)
.map_err(ShutdownState::from_u8)
}
}
impl Default for DispatcherState {
fn default() -> Self {
Self { inner: AtomicU8::new(ShutdownState::Idle as _) }
}
}
#[repr(u8)]
#[derive(Debug)]
enum ShutdownState {
Running,
ShuttingDown,
Idle,
}
impl ShutdownState {
fn from_u8(n: u8) -> Self {
const RUNNING: u8 = ShutdownState::Running as u8;
const SHUTTING_DOWN: u8 = ShutdownState::ShuttingDown as u8;
const IDLE: u8 = ShutdownState::Idle as u8;
match n {
RUNNING => ShutdownState::Running,
SHUTTING_DOWN => ShutdownState::ShuttingDown,
IDLE => ShutdownState::Idle,
_ => unreachable!(),
}
}
}
fn shutdown_check_timeout_for<E>(update_listener: &impl UpdateListener<E>) -> Duration {
const MIN_SHUTDOWN_CHECK_TIMEOUT: Duration = Duration::from_secs(1);
// FIXME: replace this by just Duration::ZERO once 1.53 will be released
const DZERO: Duration = Duration::from_secs(0);
let shutdown_check_timeout = update_listener.timeout_hint().unwrap_or(DZERO);
// FIXME: replace this by just saturating_add once 1.53 will be released
shutdown_check_timeout.checked_add(MIN_SHUTDOWN_CHECK_TIMEOUT).unwrap_or(shutdown_check_timeout)
}
fn shutdown_inner(state: &DispatcherState) -> Result<(), IdleShutdownError> {
use ShutdownState::*;
let res = state.compare_exchange(Running, ShuttingDown);
match res {
Ok(_) | Err(ShuttingDown) => Ok(()),
Err(Idle) => Err(IdleShutdownError),
Err(Running) => unreachable!(),
}
}
fn send<'a, R, Upd>(requester: &'a R, tx: &'a Tx<R, Upd>, update: Upd, variant: &'static str)
where
Upd: Debug,
R: Requester + Clone,
{
if let Some(tx) = tx {
if let Err(error) = tx.send(UpdateWithCx { requester: requester.clone(), update }) {
log::error!(
"The RX part of the {} channel is closed, but an update is received.\nError:{}\n",
variant,
error
);
}
}
}
})
.await
}
}

View file

@ -27,7 +27,7 @@
//! that:
//! - You are able to supply [`DialogueDispatcher`] as a handler.
//! - You are able to supply functions that accept
//! [`tokio::sync::mpsc::UnboundedReceiver`] and return `Future<Output = ()`
//! [`tokio::sync::mpsc::UnboundedReceiver`] and return `Future<Output = ()>`
//! as a handler.
//!
//! Since they implement [`DispatcherHandler`] too.
@ -46,14 +46,17 @@
//! [examples/dialogue_bot]: https://github.com/teloxide/teloxide/tree/master/examples/dialogue_bot
pub mod dialogue;
pub mod stop_token;
pub mod update_listeners;
pub(crate) mod repls;
mod dispatcher;
mod dispatcher_handler;
mod dispatcher_handler_rx_ext;
pub(crate) mod repls;
pub mod update_listeners;
mod update_with_cx;
pub use dispatcher::Dispatcher;
pub use dispatcher::{Dispatcher, IdleShutdownError, ShutdownToken};
pub use dispatcher_handler::DispatcherHandler;
pub use dispatcher_handler_rx_ext::DispatcherHandlerRxExt;
use tokio::sync::mpsc::UnboundedReceiver;

View file

@ -22,6 +22,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
///
/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop
/// [`Dispatcher`]: crate::dispatching::Dispatcher
#[cfg(feature = "ctrlc_handler")]
pub async fn commands_repl<R, Cmd, H, Fut, HandlerE, N>(requester: R, bot_name: N, handler: H)
where
Cmd: BotCommand + Send + 'static,
@ -56,6 +57,7 @@ where
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`commands_repl`]: crate::dispatching::repls::commands_repl()
/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener
#[cfg(feature = "ctrlc_handler")]
pub async fn commands_repl_with_listener<'a, R, Cmd, H, Fut, L, ListenerE, HandlerE, N>(
requester: R,
bot_name: N,
@ -87,6 +89,7 @@ pub async fn commands_repl_with_listener<'a, R, Cmd, H, Fut, L, ListenerE, Handl
},
)
})
.setup_ctrlc_handler()
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),

View file

@ -23,6 +23,7 @@ use teloxide_core::{requests::Requester, types::Message};
/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage
#[cfg(feature = "ctrlc_handler")]
pub async fn dialogues_repl<'a, R, H, D, Fut>(requester: R, handler: H)
where
H: Fn(UpdateWithCx<R, Message>, D) -> Fut + Send + Sync + 'static,
@ -55,6 +56,7 @@ where
/// [`dialogues_repl`]: crate::dispatching::repls::dialogues_repl()
/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener
/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage
#[cfg(feature = "ctrlc_handler")]
pub async fn dialogues_repl_with_listener<'a, R, H, D, Fut, L, ListenerE>(
requester: R,
handler: H,
@ -85,6 +87,7 @@ pub async fn dialogues_repl_with_listener<'a, R, H, D, Fut, L, ListenerE>(
}
},
))
.setup_ctrlc_handler()
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),

View file

@ -21,6 +21,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
///
/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop
/// [`Dispatcher`]: crate::dispatching::Dispatcher
#[cfg(feature = "ctrlc_handler")]
pub async fn repl<R, H, Fut, E>(requester: R, handler: H)
where
H: Fn(UpdateWithCx<R, Message>) -> Fut + Send + Sync + 'static,
@ -51,6 +52,7 @@ where
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`repl`]: crate::dispatching::repls::repl()
/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener
#[cfg(feature = "ctrlc_handler")]
pub async fn repl_with_listener<'a, R, H, Fut, E, L, ListenerE>(
requester: R,
handler: H,
@ -76,6 +78,7 @@ pub async fn repl_with_listener<'a, R, H, Fut, E, L, ListenerE>(
}
})
})
.setup_ctrlc_handler()
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),

View file

@ -0,0 +1,76 @@
//! A stop token used to stop a listener.
use std::{future::Future, pin::Pin, task};
use futures::future::{pending, AbortHandle, Abortable, Pending};
/// A stop token allows you to stop a listener.
///
/// See also: [`UpdateListener::stop_token`].
///
/// [`UpdateListener::stop_token`]:
/// crate::dispatching::update_listeners::UpdateListener::stop_token
pub trait StopToken {
/// Stop the listener linked to this token.
fn stop(self);
}
/// A stop token which does nothing. May be used in prototyping or in cases
/// where you do not care about graceful shutdowning.
pub struct Noop;
impl StopToken for Noop {
fn stop(self) {}
}
/// A stop token which corresponds to [`AsyncStopFlag`].
#[derive(Clone)]
pub struct AsyncStopToken(AbortHandle);
/// A flag which corresponds to [`AsyncStopToken`].
///
/// To know if the stop token was used you can either repeatedly call
/// [`is_stopped`] or use this type as a `Future`.
///
/// [`is_stopped`]: AsyncStopFlag::is_stopped
#[pin_project::pin_project]
pub struct AsyncStopFlag(#[pin] Abortable<Pending<()>>);
impl AsyncStopToken {
/// Create a new token/flag pair.
pub fn new_pair() -> (Self, AsyncStopFlag) {
let (handle, reg) = AbortHandle::new_pair();
let token = Self(handle);
let flag = AsyncStopFlag(Abortable::new(pending(), reg));
(token, flag)
}
}
impl StopToken for AsyncStopToken {
fn stop(self) {
self.0.abort()
}
}
impl AsyncStopFlag {
/// Returns true if the stop token linked to `self` was used.
pub fn is_stopped(&self) -> bool {
self.0.is_aborted()
}
}
/// This future resolves when a stop token was used.
impl Future for AsyncStopFlag {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
self.project().0.poll(cx).map(|res| {
debug_assert!(
res.is_err(),
"Pending Future can't ever be resolved, so Abortable is only resolved when \
canceled"
);
})
}
}

View file

@ -96,133 +96,84 @@
//!
//! [`UpdateListener`]: UpdateListener
//! [`polling_default`]: polling_default
//! [`polling`]: polling
//! [`polling`]: polling()
//! [`Box::get_updates`]: crate::requests::Requester::get_updates
//! [getting updates]: https://core.telegram.org/bots/api#getting-updates
//! [long]: https://en.wikipedia.org/wiki/Push_technology#Long_polling
//! [short]: https://en.wikipedia.org/wiki/Polling_(computer_science)
//! [webhook]: https://en.wikipedia.org/wiki/Webhook
use futures::{stream, Stream, StreamExt};
use futures::Stream;
use std::{convert::TryInto, time::Duration};
use teloxide_core::{
requests::{HasPayload, Request, Requester},
types::{AllowedUpdate, SemiparsedVec, Update},
use std::time::Duration;
use crate::{dispatching::stop_token::StopToken, types::Update};
mod polling;
mod stateful_listener;
pub use self::{
polling::{polling, polling_default},
stateful_listener::StatefulListener,
};
/// A generic update listener.
pub trait UpdateListener<E>: Stream<Item = Result<Update, E>> {
// TODO: add some methods here (.shutdown(), etc).
}
impl<S, E> UpdateListener<E> for S where S: Stream<Item = Result<Update, E>> {}
/// An update listener.
///
/// Implementors of this trait allow getting updates from Telegram.
///
/// Currently Telegram has 2 ways of getting updates -- [polling] and
/// [webhooks]. Currently, only the former one is implemented (see [`polling()`]
/// and [`polling_default`])
///
/// Some functions of this trait are located in the supertrait
/// ([`AsUpdateStream`]), see also:
/// - [`AsUpdateStream::Stream`]
/// - [`AsUpdateStream::as_stream`]
///
/// [polling]: self#long-polling
/// [webhooks]: self#webhooks
pub trait UpdateListener<E>: for<'a> AsUpdateStream<'a, E> {
/// The type of token which allows to stop this listener.
type StopToken: StopToken;
/// Returns a long polling update listener with `timeout` of 10 seconds.
///
/// See also: [`polling`](polling).
///
/// ## Notes
///
/// This function will automatically delete a webhook if it was set up.
pub async fn polling_default<R>(requester: R) -> impl UpdateListener<R::Err>
where
R: Requester,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
delete_webhook_if_setup(&requester).await;
polling(requester, Some(Duration::from_secs(10)), None, None)
/// Returns a token which stops this listener.
///
/// The [`stop`] function of the token is not guaranteed to have an
/// immediate effect. That is, some listeners can return updates even
/// after [`stop`] is called (e.g.: because of buffering).
///
/// [`stop`]: StopToken::stop
///
/// Implementors of this function are encouraged to stop listening for
/// updates as soon as possible and return `None` from the update stream as
/// soon as all cached updates are returned.
#[must_use = "This function doesn't stop listening, to stop listening you need to call stop on \
the returned token"]
fn stop_token(&mut self) -> Self::StopToken;
/// The timeout duration hint.
///
/// This hints how often dispatcher should check for a shutdown. E.g., for
/// [`polling()`] this returns the [`timeout`].
///
/// [`timeout`]: crate::payloads::GetUpdates::timeout
///
/// If you are implementing this trait and not sure what to return from this
/// function, just leave it with the default implementation.
fn timeout_hint(&self) -> Option<Duration> {
None
}
}
/// Returns a long/short polling update listener with some additional options.
/// [`UpdateListener`]'s supertrait/extension.
///
/// - `bot`: Using this bot, the returned update listener will receive updates.
/// - `timeout`: A timeout for polling.
/// - `limit`: Limits the number of updates to be retrieved at once. Values
/// between 1—100 are accepted.
/// - `allowed_updates`: A list the types of updates you want to receive.
/// See [`GetUpdates`] for defaults.
///
/// See also: [`polling_default`](polling_default).
///
/// [`GetUpdates`]: crate::payloads::GetUpdates
pub fn polling<R>(
requester: R,
timeout: Option<Duration>,
limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>,
) -> impl UpdateListener<R::Err>
where
R: Requester,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
let timeout = timeout.map(|t| t.as_secs().try_into().expect("timeout is too big"));
/// This trait is a workaround to not require GAT.
pub trait AsUpdateStream<'a, E> {
/// The stream of updates from Telegram.
type Stream: Stream<Item = Result<Update, E>> + 'a;
stream::unfold(
(allowed_updates, requester, 0),
move |(mut allowed_updates, bot, mut offset)| async move {
let mut req = bot.get_updates_fault_tolerant();
let payload = &mut req.payload_mut().0;
payload.offset = Some(offset);
payload.timeout = timeout;
payload.limit = limit;
payload.allowed_updates = allowed_updates.take();
let updates = match req.send().await {
Err(err) => vec![Err(err)],
Ok(SemiparsedVec(updates)) => {
// Set offset to the last update's id + 1
if let Some(upd) = updates.last() {
let id: i32 = match upd {
Ok(ok) => ok.id,
Err((value, _)) => value["update_id"]
.as_i64()
.expect("The 'update_id' field must always exist in Update")
.try_into()
.expect("update_id must be i32"),
};
offset = id + 1;
}
for update in &updates {
if let Err((value, e)) = update {
log::error!(
"Cannot parse an update.\nError: {:?}\nValue: {}\n\
This is a bug in teloxide-core, please open an issue here: \
https://github.com/teloxide/teloxide-core/issues.",
e,
value
);
}
}
updates.into_iter().filter_map(Result::ok).map(Ok).collect::<Vec<_>>()
}
};
Some((stream::iter(updates), (allowed_updates, bot, offset)))
},
)
.flatten()
}
async fn delete_webhook_if_setup<R>(requester: &R)
where
R: Requester,
{
let webhook_info = match requester.get_webhook_info().send().await {
Ok(ok) => ok,
Err(e) => {
log::error!("Failed to get webhook info: {:?}", e);
return;
}
};
let is_webhook_setup = !webhook_info.url.is_empty();
if is_webhook_setup {
if let Err(e) = requester.delete_webhook().send().await {
log::error!("Failed to delete a webhook: {:?}", e);
}
}
/// Creates the update [`Stream`].
///
/// [`Stream`]: AsUpdateStream::Stream
fn as_stream(&'a mut self) -> Self::Stream;
}

View file

@ -0,0 +1,173 @@
use std::{convert::TryInto, time::Duration};
use futures::{
future::{ready, Either},
stream::{self, Stream, StreamExt},
};
use crate::{
dispatching::{
stop_token::{AsyncStopFlag, AsyncStopToken},
update_listeners::{stateful_listener::StatefulListener, UpdateListener},
},
payloads::GetUpdates,
requests::{HasPayload, Request, Requester},
types::{AllowedUpdate, SemiparsedVec, Update},
};
/// Returns a long polling update listener with `timeout` of 10 seconds.
///
/// See also: [`polling`](polling).
///
/// ## Notes
///
/// This function will automatically delete a webhook if it was set up.
pub async fn polling_default<R>(requester: R) -> impl UpdateListener<R::Err>
where
R: Requester + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
delete_webhook_if_setup(&requester).await;
polling(requester, Some(Duration::from_secs(10)), None, None)
}
/// Returns a long/short polling update listener with some additional options.
///
/// - `bot`: Using this bot, the returned update listener will receive updates.
/// - `timeout`: A timeout for polling.
/// - `limit`: Limits the number of updates to be retrieved at once. Values
/// between 1—100 are accepted.
/// - `allowed_updates`: A list the types of updates you want to receive.
/// See [`GetUpdates`] for defaults.
///
/// See also: [`polling_default`](polling_default).
///
/// [`GetUpdates`]: crate::payloads::GetUpdates
pub fn polling<R>(
requester: R,
timeout: Option<Duration>,
limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>,
) -> impl UpdateListener<R::Err>
where
R: Requester + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
struct State<B: Requester> {
bot: B,
timeout: Option<u32>,
limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>,
offset: i32,
flag: AsyncStopFlag,
token: AsyncStopToken,
}
fn stream<B>(st: &mut State<B>) -> impl Stream<Item = Result<Update, B::Err>> + '_
where
B: Requester,
{
stream::unfold(st, move |state| async move {
let State { timeout, limit, allowed_updates, bot, offset, flag, .. } = &mut *state;
if flag.is_stopped() {
let mut req = bot.get_updates_fault_tolerant();
req.payload_mut().0 = GetUpdates {
offset: Some(*offset),
timeout: Some(0),
limit: Some(1),
allowed_updates: allowed_updates.take(),
};
return match req.send().await {
Ok(_) => None,
Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)),
};
}
let mut req = bot.get_updates_fault_tolerant();
req.payload_mut().0 = GetUpdates {
offset: Some(*offset),
timeout: *timeout,
limit: *limit,
allowed_updates: allowed_updates.take(),
};
let updates = match req.send().await {
Err(err) => return Some((Either::Left(stream::once(ready(Err(err)))), state)),
Ok(SemiparsedVec(updates)) => {
// Set offset to the last update's id + 1
if let Some(upd) = updates.last() {
let id: i32 = match upd {
Ok(ok) => ok.id,
Err((value, _)) => value["update_id"]
.as_i64()
.expect("The 'update_id' field must always exist in Update")
.try_into()
.expect("update_id must be i32"),
};
*offset = id + 1;
}
for update in &updates {
if let Err((value, e)) = update {
log::error!(
"Cannot parse an update.\nError: {:?}\nValue: {}\n\
This is a bug in teloxide-core, please open an issue here: \
https://github.com/teloxide/teloxide-core/issues.",
e,
value
);
}
}
updates.into_iter().filter_map(Result::ok).map(Ok)
}
};
Some((Either::Right(stream::iter(updates)), state))
})
.flatten()
}
let (token, flag) = AsyncStopToken::new_pair();
let state = State {
bot: requester,
timeout: timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")),
limit,
allowed_updates,
offset: 0,
flag,
token,
};
let stop_token = |st: &mut State<_>| st.token.clone();
let timeout_hint = Some(move |_: &State<_>| timeout);
StatefulListener { state, stream, stop_token, timeout_hint }
}
async fn delete_webhook_if_setup<R>(requester: &R)
where
R: Requester,
{
let webhook_info = match requester.get_webhook_info().send().await {
Ok(ok) => ok,
Err(e) => {
log::error!("Failed to get webhook info: {:?}", e);
return;
}
};
let is_webhook_setup = !webhook_info.url.is_empty();
if is_webhook_setup {
if let Err(e) = requester.delete_webhook().send().await {
log::error!("Failed to delete a webhook: {:?}", e);
}
}
}

View file

@ -0,0 +1,130 @@
use std::time::Duration;
use futures::Stream;
use teloxide_core::types::Update;
use crate::dispatching::{
stop_token::{self, StopToken},
update_listeners::{AsUpdateStream, UpdateListener},
};
/// A listener created from functions.
///
/// This type allows to turn a stream of updates (+ some additional functions)
/// into an [`UpdateListener`].
///
/// For an example of usage, see [`polling`].
///
/// [`polling`]: crate::dispatching::update_listeners::polling()
#[non_exhaustive]
pub struct StatefulListener<St, Assf, Sf, Thf> {
/// The state of the listener.
pub state: St,
/// The function used as [`AsUpdateStream::as_stream`].
///
/// Must be of type `for<'a> &'a mut St -> impl Stream + 'a` and callable by
/// `&mut`.
pub stream: Assf,
/// The function used as [`UpdateListener::stop_token`].
///
/// Must be of type `for<'a> &'a mut St -> impl StopToken`.
pub stop_token: Sf,
/// The function used as [`UpdateListener::timeout_hint`].
///
/// Must be of type `for<'a> &'a St -> Option<Duration>` and callable by
/// `&`.
pub timeout_hint: Option<Thf>,
}
impl<St, Assf, Sf> StatefulListener<St, Assf, Sf, for<'a> fn(&'a St) -> Option<Duration>> {
/// Creates a new stateful listener from its components.
pub fn new(state: St, stream: Assf, stop_token: Sf) -> Self {
Self { state, stream, stop_token, timeout_hint: None }
}
}
impl<St, Assf, Sf, Thf> StatefulListener<St, Assf, Sf, Thf> {
/// Creates a new stateful listener from its components.
pub fn new_with_timeout_hint(
state: St,
stream: Assf,
stop_token: Sf,
timeout_hint: Option<Thf>,
) -> Self {
Self { state, stream, stop_token, timeout_hint }
}
}
impl<S, E>
StatefulListener<
S,
for<'a> fn(&'a mut S) -> &'a mut S,
for<'a> fn(&'a mut S) -> stop_token::Noop,
for<'a> fn(&'a S) -> Option<Duration>,
>
where
S: Stream<Item = Result<Update, E>> + Unpin + 'static,
{
/// Creates a new update listener from a stream of updates which ignores
/// stop signals.
///
/// It won't be possible to ever stop this listener with a stop token.
pub fn from_stream_without_graceful_shutdown(stream: S) -> Self {
let this = Self {
state: stream,
stream: |s| s,
stop_token: |_| stop_token::Noop,
timeout_hint: Some(|_| {
// FIXME: replace this by just Duration::MAX once 1.53 releases
// be released
const NANOS_PER_SEC: u32 = 1_000_000_000;
let dmax = Duration::new(u64::MAX, NANOS_PER_SEC - 1);
Some(dmax)
}),
};
assert_update_listener(this)
}
}
impl<'a, St, Assf, Sf, Thf, Strm, E> AsUpdateStream<'a, E> for StatefulListener<St, Assf, Sf, Thf>
where
(St, Strm): 'a,
Assf: FnMut(&'a mut St) -> Strm,
Strm: Stream<Item = Result<Update, E>>,
{
type Stream = Strm;
fn as_stream(&'a mut self) -> Self::Stream {
(self.stream)(&mut self.state)
}
}
impl<St, Assf, Sf, Stt, Thf, E> UpdateListener<E> for StatefulListener<St, Assf, Sf, Thf>
where
Self: for<'a> AsUpdateStream<'a, E>,
Sf: FnMut(&mut St) -> Stt,
Stt: StopToken,
Thf: Fn(&St) -> Option<Duration>,
{
type StopToken = Stt;
fn stop_token(&mut self) -> Stt {
(self.stop_token)(&mut self.state)
}
fn timeout_hint(&self) -> Option<Duration> {
self.timeout_hint.as_ref().and_then(|f| f(&self.state))
}
}
fn assert_update_listener<L, E>(l: L) -> L
where
L: UpdateListener<E>,
{
l
}

View file

@ -9,6 +9,7 @@
| `macros` | Re-exports macros from [`teloxide-macros`]. |
| `native-tls` | Enables the [`native-tls`] TLS implementation (enabled by default). |
| `rustls` | Enables the [`rustls`] TLS implementation. |
| `ctrlc_handler` | Enables the [`Dispatcher::setup_ctrlc_handler`](dispatching::Dispatcher::setup_ctrlc_handler) function. |
| `auto-send` | Enables the `AutoSend` bot adaptor. |
| `cache-me` | Enables the `CacheMe` bot adaptor. |
| `frunk` | Enables [`teloxide::utils::UpState`]. |

View file

@ -54,6 +54,7 @@
// $ RUSTDOCFLAGS="--cfg docsrs -Znormalize-docs" cargo +nightly doc --open --all-features
// ```
#![cfg_attr(all(docsrs, feature = "nightly"), feature(doc_cfg))]
#![allow(clippy::redundant_pattern_matching)]
pub use dispatching::repls::{
commands_repl, commands_repl_with_listener, dialogues_repl, dialogues_repl_with_listener, repl,

View file

@ -1,7 +1,7 @@
/// Enables logging through [pretty-env-logger].
///
/// A logger will **only** print errors from teloxide and **all** logs from
/// your program.
/// A logger will **only** print errors, warnings, and general information from
/// teloxide and **all** logs from your program.
///
/// # Example
/// ```no_compile
@ -23,8 +23,8 @@ macro_rules! enable_logging {
/// Enables logging through [pretty-env-logger] with a custom filter for your
/// program.
///
/// A logger will **only** print errors from teloxide and restrict logs from
/// your program by the specified filter.
/// A logger will **only** print errors, warnings, and general information from
/// teloxide and restrict logs from your program by the specified filter.
///
/// # Example
/// Allow printing all logs from your program up to [`LevelFilter::Debug`] (i.e.
@ -46,7 +46,7 @@ macro_rules! enable_logging_with_filter {
pretty_env_logger::formatted_builder()
.write_style(pretty_env_logger::env_logger::WriteStyle::Auto)
.filter(Some(&env!("CARGO_PKG_NAME").replace("-", "_")), $filter)
.filter(Some("teloxide"), log::LevelFilter::Error)
.filter(Some("teloxide"), log::LevelFilter::Info)
.init();
};
}