diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml new file mode 100644 index 00000000..96317650 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/config.yml @@ -0,0 +1,4 @@ +contact_links: + - name: Teloxide Discussions + url: https://github.com/teloxide/teloxide/discussions/categories/q-a + about: Please ask and answer questions here. diff --git a/.github/ISSUE_TEMPLATE/parse-error.md b/.github/ISSUE_TEMPLATE/parse-error.md index 0e3a05b8..641c99e4 100644 --- a/.github/ISSUE_TEMPLATE/parse-error.md +++ b/.github/ISSUE_TEMPLATE/parse-error.md @@ -2,7 +2,7 @@ name: Parse error about: Report issue with `teloxide` parsing of telegram response title: 'Parse Error: ' -labels: FIXME, bug +labels: bug, FIXME, core assignees: WaffleLapkin --- diff --git a/.github/ISSUE_TEMPLATE/unknown-telegram-error.md b/.github/ISSUE_TEMPLATE/unknown-telegram-error.md index 6b5eafe2..98ab04dd 100644 --- a/.github/ISSUE_TEMPLATE/unknown-telegram-error.md +++ b/.github/ISSUE_TEMPLATE/unknown-telegram-error.md @@ -2,7 +2,7 @@ name: Unknown telegram error about: You've found telegram error which is not known to teloxide title: 'Unknown Error: ' -labels: FIXME, bug, good first issue +labels: bug, good first issue, FIXME, core, Unknown API error assignees: '' --- diff --git a/CHANGELOG.md b/CHANGELOG.md index 256bc43e..ac7d6ca8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,53 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [unreleased] -## [0.4.0] - 2021-03-19 +## [0.5.0] - 2021-07-08 + +### Added + + - `Storage::get_dialogue` to obtain a dialogue indexed by a chat ID. + - `InMemStorageError` with a single variant `DialogueNotFound` to be returned from `InMemStorage::remove_dialogue`. + - `RedisStorageError::DialogueNotFound` and `SqliteStorageError::DialogueNotFound` to be returned from `Storage::remove_dialogue`. + - A way to `shutdown` dispatcher + - `Dispatcher::shutdown_token` function. + - `ShutdownToken` with a `shutdown` function. + - `Dispatcher::setup_ctrlc_handler` function ([issue 153](https://github.com/teloxide/teloxide/issues/153)). + - `IdleShutdownError` + - Automatic update filtering ([issue 389](https://github.com/teloxide/teloxide/issues/389)). + - Added reply shortcut to every kind of messages ([PR 404](https://github.com/teloxide/teloxide/pull/404)). + +### Changed + + - Do not return a dialogue from `Storage::{remove_dialogue, update_dialogue}`. + - Return an error from `Storage::remove_dialogue` if a dialogue does not exist. + - Require `D: Clone` in `dialogues_repl(_with_listener)` and `InMemStorage`. + - Automatically delete a webhook if it was set up in `update_listeners::polling_default` (thereby making it `async`, [issue 319](https://github.com/teloxide/teloxide/issues/319)). + - `polling` and `polling_default` now require `R: 'static` + - Refactor `UpdateListener` trait: + - Add a `StopToken` associated type. + - It must implement a new `StopToken` trait which has the only function `fn stop(self);` + - Add a `stop_token` function that returns `Self::StopToken` and allows stopping the listener later ([issue 166](https://github.com/teloxide/teloxide/issues/166)). + - Remove blanked implementation. + - Remove `Stream` from super traits. + - Add `AsUpdateStream` to super traits. + - Add an `AsUpdateStream` trait that allows turning implementors into streams of updates (GAT workaround). + - Add a `timeout_hint` function (with a default implementation). + - `Dispatcher::dispatch` and `Dispatcher::dispatch_with_listener` now require mutable reference to self. + - Repls can now be stopped by `^C` signal. + - `Noop` and `AsyncStopToken`stop tokens. + - `StatefulListener`. + - Emit not only errors but also warnings and general information from teloxide, when set up by `enable_logging!`. + - Use `i64` instead of `i32` for `user_id` in `html::user_mention` and `markdown::user_mention`. + - Updated to `teloxide-core` `v0.3.0` (see it's [changelog](https://github.com/teloxide/teloxide-core/blob/master/CHANGELOG.md#030---2021-07-05) for more) + +### Fixed + + - Remove the `reqwest` dependency. It's not needed after the [teloxide-core] integration. + - A storage persistency bug ([issue 304](https://github.com/teloxide/teloxide/issues/304)). + - Log errors from `Storage::{remove_dialogue, update_dialogue}` in `DialogueDispatcher` ([issue 302](https://github.com/teloxide/teloxide/issues/302)). + - Mark all the functions of `Storage` as `#[must_use]`. + +## [0.4.0] - 2021-03-22 ### Added - Integrate [teloxide-core]. diff --git a/CODE_STYLE.md b/CODE_STYLE.md index 964aea8c..0fa0756b 100644 --- a/CODE_STYLE.md +++ b/CODE_STYLE.md @@ -124,3 +124,4 @@ C: Into, { ... } 1. Use `Into<...>` only where there exists at least one conversion **and** it will be logically to use. 2. Always mark a function as `#[must_use]` if its return value **must** be used. 3. `Box::pin(async [move] { ... })` instead of `async [move] { ... }.boxed()`. + 4. Always write `log::!(...)` instead of importing `use log::;` and invoking `!(...)`. For example, write `log::info!("blah")`. diff --git a/Cargo.toml b/Cargo.toml index 8873a8c2..a6410918 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,16 +1,17 @@ [package] name = "teloxide" -version = "0.4.0" +version = "0.5.0" edition = "2018" description = "An elegant Telegram bots framework for Rust" repository = "https://github.com/teloxide/teloxide" documentation = "https://docs.rs/teloxide/" readme = "README.md" keywords = ["teloxide", "telegram", "telegram-bot", "telegram-bot-api"] +categories = ["web-programming", "api-bindings", "asynchronous"] license = "MIT" exclude = ["media"] authors = [ - "Temirkhan Myrzamadi ", + "Hirrolot ", "Waffle Lapkin ", "p0lunin ", "Mishko torop'izhko", @@ -24,6 +25,8 @@ authors = [ maintenance = { status = "actively-developed" } [features] +default = ["native-tls", "ctrlc_handler", "teloxide-core/default"] + sqlite-storage = ["sqlx"] redis-storage = ["redis"] cbor-serializer = ["serde_cbor"] @@ -32,6 +35,8 @@ bincode-serializer = ["bincode"] frunk- = ["frunk"] macros = ["teloxide-macros"] +ctrlc_handler = ["tokio/signal"] + native-tls = ["teloxide-core/native-tls"] rustls = ["teloxide-core/rustls"] auto-send = ["teloxide-core/auto_send"] @@ -49,6 +54,7 @@ full = [ "bincode-serializer", "frunk", "macros", + "ctrlc_handler", "teloxide-core/full", "native-tls", "rustls", @@ -58,19 +64,19 @@ full = [ ] [dependencies] -teloxide-core = { version = "0.2.1", default-features = false } +teloxide-core = { version = "0.3.1", default-features = false } +#teloxide-core = { git = "https://github.com/teloxide/teloxide-core.git", rev = "...", default-features = false } teloxide-macros = { version = "0.4", optional = true } serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } -tokio = { version = "1.2", features = ["fs"] } +tokio = { version = "1.8", features = ["fs"] } tokio-util = "0.6" tokio-stream = "0.1" -reqwest = { version = "0.11", features = ["json", "stream"] } +flurry = "0.3" log = "0.4" -lockfree = "0.5.1" bytes = "1.0" mime = "0.3" @@ -89,18 +95,19 @@ sqlx = { version = "0.5", optional = true, default-features = false, features = redis = { version = "0.20", features = ["tokio-comp"], optional = true } serde_cbor = { version = "0.11", optional = true } bincode = { version = "1.3", optional = true } -frunk = { version = "0.3", optional = true } +frunk = { version = "0.4", optional = true } [dev-dependencies] smart-default = "0.6.0" rand = "0.8.3" pretty_env_logger = "0.4.0" lazy_static = "1.4.0" -tokio = { version = "1.2.0", features = ["fs", "rt-multi-thread", "macros"] } +tokio = { version = "1.8", features = ["fs", "rt-multi-thread", "macros"] } [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs"] +rustdoc-args = ["--cfg", "docsrs", "-Znormalize-docs"] +rustc-args = ["--cfg", "dep_docsrs"] [[test]] name = "redis" diff --git a/MIGRATION_GUIDE.md b/MIGRATION_GUIDE.md new file mode 100644 index 00000000..d945a6e6 --- /dev/null +++ b/MIGRATION_GUIDE.md @@ -0,0 +1,162 @@ +This document describes breaking changes of `teloxide` crate, as well as the ways to update code. +Note that the list of required changes is not fully exhaustive and it may lack something in rare cases. + +## 0.4 -> 0.5 + +### core + +#### Field type changes + +Types of some fields were changed to be more accurate. +If you used them, you may need to change types in your code too. + +Example: +```diff +let ps: PhotoSize = /* ... */; +-let w: i32 = ps.width; ++let w: u32 = ps.width; +``` + +List of changed types: +- `PhotoSoze::width`: `i32` -> `u32` +- `PhotoSoze::height`: `i32` -> `u32` +- `Restricted::until_date`: `i32` -> `DateTime` +- `Kicked::until_date` (`Banned::until_date`): `i32` -> `DateTime` +- `PublicChatSupergroup::slow_mode_delay`: `Option` -> `Option` +- `User::id`: `i32` -> `i64` (note: all methods which are accepting `user_id` were changed too) + + +#### Method output types + +In teloxide `v0.4` (core `v0.2`) some API methods had wrong return types. +This made them practically unusable as they've always returned parsing error. +On the offchance you were using the methods, you may need to adjust types in your code. + +List of changed return types: +- `get_chat_administrators`: `ChatMember` -> `Vec` +- `send_chat_action`: `Message` -> `True` +- `leave_chat`: `String` -> `True` +- `pin_chat_message`: `String` -> `True` +- `set_chat_description`: `String` -> `True` +- `set_chat_photo`: `String` -> `True` +- `set_chat_title`: `String` -> `True` +- `unpin_all_chat_messages`: `String` -> `True` +- `unpin_chat_message`: `String` -> `True` + + +#### Method parameter types + +Some API methods accept different types now. +If you've used changed parameters, you need to adjust code for new types. + +Examples: +```diff +let bot = Bot::new("TOKEN").auto_send(); + +-bot.set_webhook("url").await?; ++bot.set_webhook(Url::parse("url").unwrap()).await?; + +let link = bot + .create_chat_invite_link(chat_id) +- .expire_date(timestamp) +# Note: this is not the only way to create `DateTime`. Refer to `chrono` docs for more. ++ .expire_date(DateTime::::from_utc( ++ NaiveDateTime::from_timestamp(timestamp, 0), Utc) ++ ) + .await?; +``` + +See also: [teloxide examples fixes](https://github.com/teloxide/teloxide/pull/408/files/369e43aa7ed1b192d326e6bdfe76f3560001353f..18f88cc034e97fd437c48930728c1d5d2da7a14d). + +List of changed required params: +- `SetWebhook::url`: `String` -> `Url` + +List of changed optional params: +- `AnswerCallbackQuery::url`: `String` -> `Url` +- `SendInvoice::photo_url`: `String` -> `Url` +- `CreateChatInviteLink::expire_date`: `i64` -> `DateTime` +- `EditChatInviteLink::expire_date`: `i64` -> `DateTime` +- `KickChatMember::until_date`: `u64` -> `DateTime` +- `RestrictChatMember::until_date`: `u64` -> `DateTime` +- `SendPoll::close_date`: `u64` -> `DateTime` + + +#### Renamed items + +Some items (fields, variants, types, methods) were renamed. +If you used them, you should start using new names. + +Example: +```diff +-bot.send_chat_action(chat, ChatAction::RecordAudio).await?; ++bot.send_chat_action(chat, ChatAction::RecordVoice).await?; + +-if chat_member.is_kicked() { ++if chat_member.is_banned() { + /* ... */ +} +``` + +List of renamed items: +- `ChatAction::RecordAudio` -> `RecordVoice` +- `ChatAction::UploadAudio` -> `UploadVoice` +- `ChatMemberKind::Creator` -> `Owner` +- `ChatMemberKind::Kicked` -> `Banned` +- `Creator` -> `Owner` +- `Kicked` -> `Banned` +- `ChatMemberKind::is_Creator` -> `is_owner` * +- `ChatMemberKind::is_kicked` -> `is_banned` * +- `ChatMemberStatus::Creator` -> `Owner` +- `ChatMemberStatus::Kicked` -> `Banned` +- `kick_chat_member` -> `ban_chat_member` * +- `get_chat_members_count` -> `get_chat_member_count` * + +\* Old methods are still accessible, but deprecated + + +#### Added `impl Clone` for {`CacheMe`, `DefaultParseMode`, `Throttle`} + +Previously said bot adaptors were lacking `Clone` implementation. +To workaround this issue it was proposed to wrap bot in `Arc`. +Now it's not required, so you can remove the `Arc`: + +```diff +let bot = Bot::new(token).parse_mode(ParseMode::MarkdownV2); +-let bot = Arc::new(bot); +``` + + +### teloxide + +#### Mutable reference for dispatching + +`Dispatcher::dispatch` and `Dispatcher::dispatch_with_listener` now require mutable (unique) reference to self. +If you've used variable to store `Dispatcher`, you need to make it mutable: + +```diff +-let dp = Dispatcher::new(); ++let mut dp = Dispatcher::new(); +/* ... */ +dp.dispatch(); +``` + + +#### Listener refactor + +`UpdateListener` trait was refactored. +If you've used `polling`/`polling_default` provided by teloxide, no changes are required. +If, however, you've used or implemented `UpdateListener` directly or used a `Stream` as a listener, +then you need to refactor your code too. + +See also: [teloxide examples fixes](https://github.com/teloxide/teloxide/pull/385/files/8785b8263cb4caebf212e2a66a19f73e653eb060..c378d6ef4e524da96718beec6f989e8ac51d1531). + + +#### `polling_default` + +`polling_default` is now async, but removes webhook. + +Example fix: +```diff +-let listener = polling_default(bot); ++let listener = polling_default(bot).await; +``` diff --git a/README.md b/README.md index 874f0703..6e2a1a86 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ +[_v0.4.0 => v0.5.0 migration guide >>_](MIGRATION_GUIDE.md#04---05) + -## Table of contents - - [Highlights](#highlights) - - [Setting up your environment](#setting-up-your-environment) - - [API overview](#api-overview) - - [The dices bot](#the-dices-bot) - - [Commands](#commands) - - [Dialogues management](#dialogues-management) - - [Recommendations](#recommendations) - - [Cargo features](#cargo-features) - - [FAQ](#faq) - - [Community bots](#community-bots) - - [Contributing](#contributing) - ## Highlights - **Functional reactive design.** teloxide follows [functional reactive design], allowing you to declaratively manipulate streams of updates from Telegram using filters, maps, folds, zips, and a lot of [other adaptors]. @@ -79,10 +68,10 @@ $ rustup override set nightly 5. Run `cargo new my_bot`, enter the directory and put these lines into your `Cargo.toml`: ```toml [dependencies] -teloxide = "0.4" +teloxide = { version = "0.4", features = ["auto-send", "macros"] } log = "0.4.8" pretty_env_logger = "0.4.0" -tokio = { version = "1.3", features = ["rt-threaded", "macros"] } +tokio = { version = "1.3", features = ["rt-multi-thread", "macros"] } ``` ## API overview @@ -147,7 +136,7 @@ async fn answer( command: Command, ) -> Result<(), Box> { match command { - Command::Help => cx.answer(Command::descriptions()).send().await?, + Command::Help => cx.answer(Command::descriptions()).await?, Command::Username(username) => { cx.answer(format!("Your username is @{}.", username)).await? } @@ -178,7 +167,7 @@ async fn main() { ### Dialogues management -A dialogue is described by an enumeration where each variant is one of possible dialogue's states. There are also _subtransition functions_, which turn a dialogue from one state to another, thereby forming a [FSM]. +A dialogue is described by an enumeration where each variant is one of possible dialogue's states. There are also _subtransition functions_, which turn a dialogue from one state to another, thereby forming an [FSM]. [FSM]: https://en.wikipedia.org/wiki/Finite-state_machine @@ -378,29 +367,6 @@ async fn handle_message( The second one produces very strange compiler messages due to the `#[tokio::main]` macro. However, the examples in this README use the second variant for brevity. -## Cargo features - - - `redis-storage` -- enables the [Redis] support. - - `sqlite-storage` -- enables the [Sqlite] support. - - `cbor-serializer` -- enables the [CBOR] serializer for dialogues. - - `bincode-serializer` -- enables the [Bincode] serializer for dialogues. - - `frunk` -- enables [`teloxide::utils::UpState`], which allows mapping from a structure of `field1, ..., fieldN` to a structure of `field1, ..., fieldN, fieldN+1`. - - `macros` -- re-exports macros from [`teloxide-macros`]. - - `native-tls` -- enables the [`native-tls`] TLS implementation (enabled by default). - - `rustls` -- enables the [`rustls`] TLS implementation. - - `auto-send` -- enables `AutoSend` bot adaptor. - - `cache-me` -- enables the `CacheMe` bot adaptor. - - `full` -- enables all the features except `nightly`. - - `nightly` -- enables nightly-only features (see the [teloxide-core's features]). - -[CBOR]: https://en.wikipedia.org/wiki/CBOR -[Bincode]: https://github.com/servo/bincode -[`teloxide::utils::UpState`]: https://docs.rs/teloxide/latest/teloxide/utils/trait.UpState.html -[`teloxide-macros`]: https://github.com/teloxide/teloxide-macros -[`native-tls`]: https://docs.rs/native-tls -[`rustls`]: https://docs.rs/rustls -[teloxide-core's features]: https://docs.rs/teloxide-core/0.2.1/teloxide_core/#cargo-features - ## FAQ **Q: Where I can ask questions?** @@ -443,15 +409,21 @@ A: Yes. You can setup any logger, for example, [fern], e.g. teloxide has no spec [`enable_logging_with_filter!`]: https://docs.rs/teloxide/latest/teloxide/macro.enable_logging_with_filter.html ## Community bots -Feel free to push your own bot into our collection! +Feel free to propose your own bot to our collection! - - [_steadylearner/subreddit_reader_](https://github.com/steadylearner/Rust-Full-Stack/tree/master/commits/teloxide/subreddit_reader) - - [_ArtHome12/vzmuinebot -- Telegram bot for food menu navigate_](https://github.com/ArtHome12/vzmuinebot) - - [_Hermitter/tepe -- A CLI to command a bot to send messages and files over Telegram_](https://github.com/Hermitter/tepe) - - [_ArtHome12/cognito_bot -- The bot is designed to anonymize messages to a group_](https://github.com/ArtHome12/cognito_bot) - - [_GoldsteinE/tg-vimhelpbot -- Link `:help` for Vim in Telegram_](https://github.com/GoldsteinE/tg-vimhelpbot) - - [_sschiz/janitor-bot_ -- A bot that removes users trying to join to a chat that is designed for comments](https://github.com/sschiz/janitor-bot) - - [ myblackbeard/basketball-betting-bot -- The bot lets you bet on NBA games against your buddies](https://github.com/myblackbeard/basketball-betting-bot) + - [dracarys18/grpmr-rs](https://github.com/dracarys18/grpmr-rs) -- A telegram group manager bot with variety of extra features. + - [steadylearner/subreddit_reader](https://github.com/steadylearner/Rust-Full-Stack/tree/master/commits/teloxide/subreddit_reader) -- A bot that shows the latest posts at Rust subreddit. + - [ArtHome12/vzmuinebot](https://github.com/ArtHome12/vzmuinebot) -- Telegram bot for food menu navigate. + - [ArtHome12/cognito_bot](https://github.com/ArtHome12/cognito_bot) -- The bot is designed to anonymize messages to a group. + - [Hermitter/tepe](https://github.com/Hermitter/tepe) -- A CLI to command a bot to send messages and files over Telegram. + - [pro-vim/tg-vimhelpbot](https://github.com/pro-vim/tg-vimhelpbot) -- Link `:help` for Vim in Telegram. + - [sschiz/janitor-bot](https://github.com/sschiz/janitor-bot) -- A bot that removes users trying to join to a chat that is designed for comments. + - [myblackbeard/basketball-betting-bot](https://github.com/myblackbeard/basketball-betting-bot) -- The bot lets you bet on NBA games against your buddies. + - [slondr/BeerHolderBot](https://gitlab.com/slondr/BeerHolderBot) -- A bot that holds your beer. + - [mxseev/logram](https://github.com/mxseev/logram) -- Utility that takes logs from anywhere and sends them to Telegram. + - [msfjarvis/walls-bot-rs](https://github.com/msfjarvis/walls-bot-rs) -- Telegram bot for my wallpapers collection, in Rust. + - [MustafaSalih1993/Miss-Vodka-Telegram-Bot](https://github.com/MustafaSalih1993/Miss-Vodka-Telegram-Bot) -- A telegram bot written in rust using "Teloxide" library. + - [x13a/tg-prompt](https://github.com/x13a/tg-prompt) -- Telegram prompt. ## Contributing See [CONRIBUTING.md](https://github.com/teloxide/teloxide/blob/master/CONTRIBUTING.md). diff --git a/examples/admin_bot/Cargo.toml b/examples/admin_bot/Cargo.toml index ccaf49c0..52c6f017 100644 --- a/examples/admin_bot/Cargo.toml +++ b/examples/admin_bot/Cargo.toml @@ -11,6 +11,7 @@ teloxide = { path = "../../", features = ["macros", "auto-send"] } log = "0.4.8" pretty_env_logger = "0.4.0" tokio = { version = "1.3.0", features = ["rt-multi-thread", "macros"] } +chrono = "0.4" [profile.release] lto = true diff --git a/examples/admin_bot/src/main.rs b/examples/admin_bot/src/main.rs index 0351f9bd..904c9369 100644 --- a/examples/admin_bot/src/main.rs +++ b/examples/admin_bot/src/main.rs @@ -1,8 +1,7 @@ -use std::{convert::TryInto, error::Error, str::FromStr}; +use std::{error::Error, str::FromStr}; -use teloxide::{prelude::*, utils::command::BotCommand}; - -use teloxide::types::ChatPermissions; +use chrono::{DateTime, Duration, NaiveDateTime, Utc}; +use teloxide::{prelude::*, types::{ChatPermissions, Me}, utils::command::BotCommand}; // Derive BotCommand to parse text with a command into this enumeration. // @@ -24,12 +23,12 @@ enum Command { Kick, #[command(description = "ban user in chat.")] Ban { - time: u32, + time: u64, unit: UnitOfTime, }, #[command(description = "mute user in chat.")] Mute { - time: u32, + time: u64, unit: UnitOfTime, }, Help, @@ -54,18 +53,18 @@ impl FromStr for UnitOfTime { } // Calculates time of user restriction. -fn calc_restrict_time(time: u32, unit: UnitOfTime) -> u32 { +fn calc_restrict_time(time: u64, unit: UnitOfTime) -> Duration { match unit { - UnitOfTime::Hours => time * 3600, - UnitOfTime::Minutes => time * 60, - UnitOfTime::Seconds => time, + UnitOfTime::Hours => Duration::hours(time as i64), + UnitOfTime::Minutes => Duration::minutes(time as i64), + UnitOfTime::Seconds => Duration::seconds(time as i64), } } type Cx = UpdateWithCx, Message>; // Mute a user with a replied message. -async fn mute_user(cx: &Cx, time: u32) -> Result<(), Box> { +async fn mute_user(cx: &Cx, time: Duration) -> Result<(), Box> { match cx.update.reply_to_message() { Some(msg1) => { cx.requester @@ -74,7 +73,12 @@ async fn mute_user(cx: &Cx, time: u32) -> Result<(), Box::from_utc( + NaiveDateTime::from_timestamp(cx.update.date as i64, 0), + Utc, + ) + time, + ) .await?; } None => { @@ -102,7 +106,7 @@ async fn kick_user(cx: &Cx) -> Result<(), Box> { } // Ban a user with replied message. -async fn ban_user(cx: &Cx, time: u32) -> Result<(), Box> { +async fn ban_user(cx: &Cx, time: Duration) -> Result<(), Box> { match cx.update.reply_to_message() { Some(message) => { cx.requester @@ -110,7 +114,12 @@ async fn ban_user(cx: &Cx, time: u32) -> Result<(), Box cx.update.chat_id(), message.from().expect("Must be MessageKind::Common").id, ) - .until_date((cx.update.date + time as i32).try_into().unwrap()) + .until_date( + DateTime::::from_utc( + NaiveDateTime::from_timestamp(cx.update.date as i64, 0), + Utc, + ) + time, + ) .await?; } None => { @@ -142,6 +151,7 @@ async fn run() { let bot = Bot::from_env().auto_send(); - let bot_name: String = panic!("Your bot's name here"); + let Me { user: bot_user, .. } = bot.get_me().await.unwrap(); + let bot_name = bot_user.username.expect("Bots must have usernames"); teloxide::commands_repl(bot, bot_name, action).await; } diff --git a/examples/dialogue_bot/Cargo.toml b/examples/dialogue_bot/Cargo.toml index 45f928ea..8d2ea09a 100644 --- a/examples/dialogue_bot/Cargo.toml +++ b/examples/dialogue_bot/Cargo.toml @@ -16,8 +16,8 @@ log = "0.4.8" pretty_env_logger = "0.4.0" derive_more = "0.99.9" -frunk = "0.3.1" -frunk_core = "0.3.1" +frunk = "0.4" +frunk_core = "0.4" [profile.release] lto = true diff --git a/examples/dialogue_bot/src/dialogue/mod.rs b/examples/dialogue_bot/src/dialogue/mod.rs index 4bae759a..43ad8db3 100644 --- a/examples/dialogue_bot/src/dialogue/mod.rs +++ b/examples/dialogue_bot/src/dialogue/mod.rs @@ -6,7 +6,7 @@ use crate::dialogue::states::{ use derive_more::From; use teloxide::macros::Transition; -#[derive(Transition, From)] +#[derive(Transition, Clone, From)] pub enum Dialogue { Start(StartState), ReceiveFullName(ReceiveFullNameState), diff --git a/examples/dialogue_bot/src/dialogue/states/receive_age.rs b/examples/dialogue_bot/src/dialogue/states/receive_age.rs index 36c72a23..099b3407 100644 --- a/examples/dialogue_bot/src/dialogue/states/receive_age.rs +++ b/examples/dialogue_bot/src/dialogue/states/receive_age.rs @@ -1,7 +1,7 @@ use crate::dialogue::{states::receive_location::ReceiveLocationState, Dialogue}; use teloxide::prelude::*; -#[derive(Generic)] +#[derive(Clone, Generic)] pub struct ReceiveAgeState { pub full_name: String, } diff --git a/examples/dialogue_bot/src/dialogue/states/receive_full_name.rs b/examples/dialogue_bot/src/dialogue/states/receive_full_name.rs index 21d3fef2..2ea60a1c 100644 --- a/examples/dialogue_bot/src/dialogue/states/receive_full_name.rs +++ b/examples/dialogue_bot/src/dialogue/states/receive_full_name.rs @@ -1,7 +1,7 @@ use crate::dialogue::{states::receive_age::ReceiveAgeState, Dialogue}; use teloxide::prelude::*; -#[derive(Generic)] +#[derive(Clone, Generic)] pub struct ReceiveFullNameState; #[teloxide(subtransition)] diff --git a/examples/dialogue_bot/src/dialogue/states/receive_location.rs b/examples/dialogue_bot/src/dialogue/states/receive_location.rs index aaa1af2d..3c1f6407 100644 --- a/examples/dialogue_bot/src/dialogue/states/receive_location.rs +++ b/examples/dialogue_bot/src/dialogue/states/receive_location.rs @@ -1,7 +1,7 @@ use crate::dialogue::Dialogue; use teloxide::prelude::*; -#[derive(Generic)] +#[derive(Clone, Generic)] pub struct ReceiveLocationState { pub full_name: String, pub age: u8, diff --git a/examples/dialogue_bot/src/dialogue/states/start.rs b/examples/dialogue_bot/src/dialogue/states/start.rs index a4f3c192..f3f12e0c 100644 --- a/examples/dialogue_bot/src/dialogue/states/start.rs +++ b/examples/dialogue_bot/src/dialogue/states/start.rs @@ -1,6 +1,7 @@ use crate::dialogue::{states::ReceiveFullNameState, Dialogue}; use teloxide::prelude::*; +#[derive(Clone)] pub struct StartState; #[teloxide(subtransition)] diff --git a/examples/heroku_ping_pong_bot/src/main.rs b/examples/heroku_ping_pong_bot/src/main.rs index 8f02734a..653fc9f8 100644 --- a/examples/heroku_ping_pong_bot/src/main.rs +++ b/examples/heroku_ping_pong_bot/src/main.rs @@ -1,14 +1,14 @@ // The version of Heroku ping-pong-bot, which uses a webhook to receive updates // from Telegram, instead of long polling. -use teloxide::{dispatching::update_listeners, prelude::*, types::Update}; +use teloxide::{dispatching::{update_listeners::{self, StatefulListener}, stop_token::AsyncStopToken}, prelude::*, types::Update}; use std::{convert::Infallible, env, net::SocketAddr}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use warp::Filter; -use reqwest::StatusCode; +use reqwest::{StatusCode, Url}; #[tokio::main] async fn main() { @@ -20,8 +20,8 @@ async fn handle_rejection(error: warp::Rejection) -> Result(bot: AutoSend) -> impl update_listeners::UpdateListener { - // Heroku defines auto defines a port value +pub async fn webhook(bot: AutoSend) -> impl update_listeners::UpdateListener { + // Heroku auto defines a port value let teloxide_token = env::var("TELOXIDE_TOKEN").expect("TELOXIDE_TOKEN env variable missing"); let port: u16 = env::var("PORT") .expect("PORT env variable missing") @@ -30,7 +30,7 @@ pub async fn webhook<'a>(bot: AutoSend) -> impl update_listeners::UpdateLis // Heroku host example .: "heroku-ping-pong-bot.herokuapp.com" let host = env::var("HOST").expect("have HOST env variable"); let path = format!("bot{}", teloxide_token); - let url = format!("https://{}/{}", host, path); + let url = Url::parse(&format!("https://{}/{}", host, path)).unwrap(); bot.set_webhook(url).await.expect("Cannot setup a webhook"); @@ -48,11 +48,21 @@ pub async fn webhook<'a>(bot: AutoSend) -> impl update_listeners::UpdateLis }) .recover(handle_rejection); - let serve = warp::serve(server); + let (stop_token, stop_flag) = AsyncStopToken::new_pair(); - let address = format!("0.0.0.0:{}", port); - tokio::spawn(serve.run(address.parse::().unwrap())); - UnboundedReceiverStream::new(rx) + let addr = format!("0.0.0.0:{}", port).parse::().unwrap(); + let server = warp::serve(server); + let (_addr, fut) = server.bind_with_graceful_shutdown(addr, stop_flag); + + // You might want to use serve.key_path/serve.cert_path methods here to + // setup a self-signed TLS certificate. + + tokio::spawn(fut); + let stream = UnboundedReceiverStream::new(rx); + + fn streamf(state: &mut (S, T)) -> &mut S { &mut state.0 } + + StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| state.1.clone()) } async fn run() { diff --git a/examples/ngrok_ping_pong_bot/src/main.rs b/examples/ngrok_ping_pong_bot/src/main.rs index a00c60ee..c748e0c8 100644 --- a/examples/ngrok_ping_pong_bot/src/main.rs +++ b/examples/ngrok_ping_pong_bot/src/main.rs @@ -1,14 +1,14 @@ // The version of ngrok ping-pong-bot, which uses a webhook to receive updates // from Telegram, instead of long polling. -use teloxide::{dispatching::update_listeners, prelude::*, types::Update}; +use teloxide::{dispatching::{update_listeners::{self, StatefulListener}, stop_token::AsyncStopToken}, prelude::*, types::Update}; use std::{convert::Infallible, net::SocketAddr}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use warp::Filter; -use reqwest::StatusCode; +use reqwest::{StatusCode, Url}; #[tokio::main] async fn main() { @@ -20,10 +20,12 @@ async fn handle_rejection(error: warp::Rejection) -> Result(bot: AutoSend) -> impl update_listeners::UpdateListener { +pub async fn webhook(bot: AutoSend) -> impl update_listeners::UpdateListener { + let url = Url::parse("Your HTTPS ngrok URL here. Get it by `ngrok http 80`").unwrap(); + // You might want to specify a self-signed certificate via .certificate // method on SetWebhook. - bot.set_webhook("Your HTTPS ngrok URL here. Get it by 'ngrok http 80'") + bot.set_webhook(url) .await .expect("Cannot setup a webhook"); @@ -40,13 +42,21 @@ pub async fn webhook<'a>(bot: AutoSend) -> impl update_listeners::UpdateLis }) .recover(handle_rejection); - let serve = warp::serve(server); + let (stop_token, stop_flag) = AsyncStopToken::new_pair(); + + let addr = "127.0.0.1:80".parse::().unwrap(); + let server = warp::serve(server); + let (_addr, fut) = server.bind_with_graceful_shutdown(addr, stop_flag); // You might want to use serve.key_path/serve.cert_path methods here to // setup a self-signed TLS certificate. - tokio::spawn(serve.run("127.0.0.1:80".parse::().unwrap())); - UnboundedReceiverStream::new(rx) + tokio::spawn(fut); + let stream = UnboundedReceiverStream::new(rx); + + fn streamf(state: &mut (S, T)) -> &mut S { &mut state.0 } + + StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| state.1.clone()) } async fn run() { diff --git a/examples/simple_commands_bot/src/main.rs b/examples/simple_commands_bot/src/main.rs index 8b34c7b0..5c459402 100644 --- a/examples/simple_commands_bot/src/main.rs +++ b/examples/simple_commands_bot/src/main.rs @@ -18,7 +18,7 @@ async fn answer( command: Command, ) -> Result<(), Box> { match command { - Command::Help => cx.answer(Command::descriptions()).send().await?, + Command::Help => cx.answer(Command::descriptions()).await?, Command::Username(username) => { cx.answer(format!("Your username is @{}.", username)).await? } diff --git a/netlify.toml b/netlify.toml index 1197ce45..21b40399 100644 --- a/netlify.toml +++ b/netlify.toml @@ -1,6 +1,6 @@ [build] command = "rustup install nightly --profile minimal && cargo +nightly doc --all-features --no-deps && cp -r target/doc _netlify_out" -environment = { RUSTDOCFLAGS= "--cfg docsrs" } +environment = { RUSTFLAGS="--cfg dep_docsrs", RUSTDOCFLAGS= "--cfg docsrs -Znormalize-docs" } publish = "_netlify_out" [[redirects]] diff --git a/src/dispatching/dialogue/dialogue_dispatcher.rs b/src/dispatching/dialogue/dialogue_dispatcher.rs index 706cf952..cee26a29 100644 --- a/src/dispatching/dialogue/dialogue_dispatcher.rs +++ b/src/dispatching/dialogue/dialogue_dispatcher.rs @@ -4,12 +4,13 @@ use crate::dispatching::{ }, DispatcherHandler, UpdateWithCx, }; -use std::{convert::Infallible, marker::PhantomData}; +use std::{fmt::Debug, marker::PhantomData}; use futures::{future::BoxFuture, FutureExt, StreamExt}; use tokio::sync::mpsc; -use lockfree::map::Map; +use crate::dispatching::dialogue::InMemStorageError; +use flurry::HashMap; use std::sync::{Arc, Mutex}; use teloxide_core::requests::Requester; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -19,6 +20,11 @@ use tokio_stream::wrappers::UnboundedReceiverStream; /// Note that it implements [`DispatcherHandler`], so you can just put an /// instance of this dispatcher into the [`Dispatcher`]'s methods. /// +/// Note that when the storage methods [`Storage::remove_dialogue`] and +/// [`Storage::update_dialogue`] are failed, the errors are logged, but a result +/// from [`Storage::get_dialogue`] is provided to a user handler as-is so you +/// can respond to a concrete user with an error description. +/// /// See the [module-level documentation](crate::dispatching::dialogue) for the /// design overview. /// @@ -35,12 +41,12 @@ pub struct DialogueDispatcher { /// A value is the TX part of an unbounded asynchronous MPSC channel. A /// handler that executes updates from the same chat ID sequentially /// handles the RX part. - senders: Arc>>>, + senders: Arc>>>, } impl DialogueDispatcher, H, Upd> where - H: DialogueDispatcherHandler + Send + Sync + 'static, + H: DialogueDispatcherHandler + Send + Sync + 'static, Upd: GetChatId + Send + 'static, D: Default + Send + 'static, { @@ -53,7 +59,7 @@ where Self { storage: InMemStorage::new(), handler: Arc::new(handler), - senders: Arc::new(Map::new()), + senders: Arc::new(HashMap::new()), _phantom: PhantomData, } } @@ -65,7 +71,7 @@ where Upd: GetChatId + Send + 'static, D: Default + Send + 'static, S: Storage + Send + Sync + 'static, - S::Error: Send + 'static, + S::Error: Debug + Send + 'static, { /// Creates a dispatcher with the specified `handler` and `storage`. #[must_use] @@ -73,7 +79,7 @@ where Self { storage, handler: Arc::new(handler), - senders: Arc::new(Map::new()), + senders: Arc::new(HashMap::new()), _phantom: PhantomData, } } @@ -97,28 +103,24 @@ where async move { let chat_id = cx.update.chat_id(); - let dialogue = Arc::clone(&storage) - .remove_dialogue(chat_id) - .await - .map(Option::unwrap_or_default); + let dialogue = + Arc::clone(&storage).get_dialogue(chat_id).await.map(Option::unwrap_or_default); match handler.handle(DialogueWithCx { cx, dialogue }).await { DialogueStage::Next(new_dialogue) => { - if let Ok(Some(_)) = storage.update_dialogue(chat_id, new_dialogue).await { - panic!( - "Oops, you have an bug in your Storage: update_dialogue returns \ - Some after remove_dialogue" - ); + if let Err(e) = storage.update_dialogue(chat_id, new_dialogue).await { + log::error!("Storage::update_dialogue failed: {:?}", e); } } DialogueStage::Exit => { // On the next .poll() call, the spawned future will // return Poll::Ready, because we are dropping the // sender right here: - senders.remove(&chat_id); + senders.pin().remove(&chat_id); - // We already removed a dialogue from `storage` (see - // the beginning of this async block). + if let Err(e) = storage.remove_dialogue(chat_id).await { + log::error!("Storage::remove_dialogue failed: {:?}", e); + } } } } @@ -134,7 +136,7 @@ where Upd: GetChatId + Send + 'static, D: Default + Send + 'static, S: Storage + Send + Sync + 'static, - S::Error: Send + 'static, + S::Error: Debug + Send + 'static, R: Requester + Send, { fn handle( @@ -151,10 +153,10 @@ where let this = Arc::clone(&this); let chat_id = cx.update.chat_id(); - match this.senders.get(&chat_id) { + match this.senders.pin().get(&chat_id) { // An old dialogue Some(tx) => { - if tx.1.send(cx).is_err() { + if tx.send(cx).is_err() { panic!("We are not dropping a receiver or call .close() on it",); } } @@ -163,7 +165,7 @@ where if tx.send(cx).is_err() { panic!("We are not dropping a receiver or call .close() on it",); } - this.senders.insert(chat_id, tx); + this.senders.pin().insert(chat_id, tx); } } @@ -213,7 +215,7 @@ mod tests { } let dispatcher = DialogueDispatcher::new( - |cx: DialogueWithCx| async move { + |cx: DialogueWithCx| async move { tokio::time::sleep(Duration::from_millis(300)).await; match cx.cx.update { diff --git a/src/dispatching/dialogue/mod.rs b/src/dispatching/dialogue/mod.rs index 303d161c..097c7180 100644 --- a/src/dispatching/dialogue/mod.rs +++ b/src/dispatching/dialogue/mod.rs @@ -33,10 +33,17 @@ //! # #[cfg(feature = "macros")] { //! use std::convert::Infallible; //! -//! use teloxide::{dispatching::dialogue::Transition, prelude::*, teloxide, RequestError}; +//! use teloxide::{ +//! dispatching::dialogue::{InMemStorageError, Transition}, +//! prelude::*, +//! teloxide, RequestError, +//! }; //! +//! #[derive(Clone)] //! struct _1State; +//! #[derive(Clone)] //! struct _2State; +//! #[derive(Clone)] //! struct _3State; //! //! type Out = TransitionOut; @@ -56,7 +63,7 @@ //! todo!() //! } //! -//! #[derive(Transition)] +//! #[derive(Clone, Transition)] //! enum D { //! _1(_1State), //! _2(_2State), @@ -69,7 +76,7 @@ //! } //! } //! -//! type In = DialogueWithCx, Message, D, Infallible>; +//! type In = DialogueWithCx, Message, D, InMemStorageError>; //! //! #[tokio::main] //! async fn main() { @@ -168,4 +175,4 @@ pub use storage::{RedisStorage, RedisStorageError}; #[cfg(feature = "sqlite-storage")] pub use storage::{SqliteStorage, SqliteStorageError}; -pub use storage::{serializer, InMemStorage, Serializer, Storage, TraceStorage}; +pub use storage::{serializer, InMemStorage, InMemStorageError, Serializer, Storage, TraceStorage}; diff --git a/src/dispatching/dialogue/storage/in_mem_storage.rs b/src/dispatching/dialogue/storage/in_mem_storage.rs index 468a305e..d26a21eb 100644 --- a/src/dispatching/dialogue/storage/in_mem_storage.rs +++ b/src/dispatching/dialogue/storage/in_mem_storage.rs @@ -1,18 +1,23 @@ use super::Storage; use futures::future::BoxFuture; use std::{collections::HashMap, sync::Arc}; +use thiserror::Error; use tokio::sync::Mutex; -/// A memory storage based on a hash map. Stores all the dialogues directly in -/// RAM. +/// An error returned from [`InMemStorage`]. +#[derive(Debug, Error)] +pub enum InMemStorageError { + /// Returned from [`InMemStorage::remove_dialogue`]. + #[error("row not found")] + DialogueNotFound, +} + +/// A dialogue storage based on [`std::collections::HashMap`]. /// /// ## Note -/// All the dialogues will be lost after you restart your bot. If you need to -/// store them somewhere on a drive, you should use [`SqliteStorage`], -/// [`RedisStorage`] or implement your own. -/// -/// [`RedisStorage`]: crate::dispatching::dialogue::RedisStorage -/// [`SqliteStorage`]: crate::dispatching::dialogue::SqliteStorage +/// All your dialogues will be lost after you restart your bot. If you need to +/// store them somewhere on a drive, you should use e.g. +/// [`super::SqliteStorage`] or implement your own. #[derive(Debug)] pub struct InMemStorage { map: Mutex>, @@ -25,27 +30,44 @@ impl InMemStorage { } } -impl Storage for InMemStorage { - type Error = std::convert::Infallible; +impl Storage for InMemStorage +where + D: Clone, + D: Send + 'static, +{ + type Error = InMemStorageError; - fn remove_dialogue( - self: Arc, - chat_id: i64, - ) -> BoxFuture<'static, Result, Self::Error>> + fn remove_dialogue(self: Arc, chat_id: i64) -> BoxFuture<'static, Result<(), Self::Error>> where D: Send + 'static, { - Box::pin(async move { Ok(self.map.lock().await.remove(&chat_id)) }) + Box::pin(async move { + self.map + .lock() + .await + .remove(&chat_id) + .map_or(Err(InMemStorageError::DialogueNotFound), |_| Ok(())) + }) } fn update_dialogue( self: Arc, chat_id: i64, dialogue: D, - ) -> BoxFuture<'static, Result, Self::Error>> + ) -> BoxFuture<'static, Result<(), Self::Error>> where D: Send + 'static, { - Box::pin(async move { Ok(self.map.lock().await.insert(chat_id, dialogue)) }) + Box::pin(async move { + self.map.lock().await.insert(chat_id, dialogue); + Ok(()) + }) + } + + fn get_dialogue( + self: Arc, + chat_id: i64, + ) -> BoxFuture<'static, Result, Self::Error>> { + Box::pin(async move { Ok(self.map.lock().await.get(&chat_id).map(ToOwned::to_owned)) }) } } diff --git a/src/dispatching/dialogue/storage/mod.rs b/src/dispatching/dialogue/storage/mod.rs index 31664425..dbf4c25a 100644 --- a/src/dispatching/dialogue/storage/mod.rs +++ b/src/dispatching/dialogue/storage/mod.rs @@ -11,7 +11,10 @@ mod sqlite_storage; use futures::future::BoxFuture; -pub use self::{in_mem_storage::InMemStorage, trace_storage::TraceStorage}; +pub use self::{ + in_mem_storage::{InMemStorage, InMemStorageError}, + trace_storage::TraceStorage, +}; #[cfg(feature = "redis-storage")] #[cfg_attr(all(docsrs, feature = "nightly"), doc(cfg(feature = "redis-storage")))] @@ -27,11 +30,14 @@ pub use sqlite_storage::{SqliteStorage, SqliteStorageError}; /// You can implement this trait for a structure that communicates with a DB and /// be sure that after you restart your bot, all the dialogues won't be lost. /// +/// `Storage` is used only to store dialogue states, i.e. it can't be used as a +/// generic database. +/// /// Currently we support the following storages out of the box: /// -/// - [`InMemStorage`] - a storage based on a simple hash map -/// - [`RedisStorage`] - a Redis-based storage -/// - [`SqliteStorage`] - an SQLite-based persistent storage +/// - [`InMemStorage`] -- a storage based on [`std::collections::HashMap`]. +/// - [`RedisStorage`] -- a Redis-based storage. +/// - [`SqliteStorage`] -- an SQLite-based persistent storage. /// /// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage /// [`RedisStorage`]: crate::dispatching::dialogue::RedisStorage @@ -39,26 +45,32 @@ pub use sqlite_storage::{SqliteStorage, SqliteStorageError}; pub trait Storage { type Error; - /// Removes a dialogue with the specified `chat_id`. + /// Removes a dialogue indexed by `chat_id`. /// - /// Returns `None` if there wasn't such a dialogue, `Some(dialogue)` if a - /// `dialogue` was deleted. + /// If the dialogue indexed by `chat_id` does not exist, this function + /// results in an error. + #[must_use = "Futures are lazy and do nothing unless polled with .await"] fn remove_dialogue( self: Arc, chat_id: i64, - ) -> BoxFuture<'static, Result, Self::Error>> + ) -> BoxFuture<'static, Result<(), Self::Error>> where D: Send + 'static; - /// Updates a dialogue with the specified `chat_id`. - /// - /// Returns `None` if there wasn't such a dialogue, `Some(dialogue)` if a - /// `dialogue` was updated. + /// Updates a dialogue indexed by `chat_id` with `dialogue`. + #[must_use = "Futures are lazy and do nothing unless polled with .await"] fn update_dialogue( self: Arc, chat_id: i64, dialogue: D, - ) -> BoxFuture<'static, Result, Self::Error>> + ) -> BoxFuture<'static, Result<(), Self::Error>> where D: Send + 'static; + + /// Returns the dialogue indexed by `chat_id`. + #[must_use = "Futures are lazy and do nothing unless polled with .await"] + fn get_dialogue( + self: Arc, + chat_id: i64, + ) -> BoxFuture<'static, Result, Self::Error>>; } diff --git a/src/dispatching/dialogue/storage/redis_storage.rs b/src/dispatching/dialogue/storage/redis_storage.rs index 31a358e8..5e2eb843 100644 --- a/src/dispatching/dialogue/storage/redis_storage.rs +++ b/src/dispatching/dialogue/storage/redis_storage.rs @@ -1,6 +1,6 @@ use super::{serializer::Serializer, Storage}; use futures::future::BoxFuture; -use redis::{AsyncCommands, FromRedisValue, IntoConnectionInfo}; +use redis::{AsyncCommands, IntoConnectionInfo}; use serde::{de::DeserializeOwned, Serialize}; use std::{ convert::Infallible, @@ -12,8 +12,6 @@ use thiserror::Error; use tokio::sync::Mutex; /// An error returned from [`RedisStorage`]. -/// -/// [`RedisStorage`]: struct.RedisStorage.html #[derive(Debug, Error)] pub enum RedisStorageError where @@ -21,11 +19,16 @@ where { #[error("parsing/serializing error: {0}")] SerdeError(SE), + #[error("error from Redis: {0}")] RedisError(#[from] redis::RedisError), + + /// Returned from [`RedisStorage::remove_dialogue`]. + #[error("row not found")] + DialogueNotFound, } -/// A memory storage based on [Redis](https://redis.io/). +/// A dialogue storage based on [Redis](https://redis.io/). pub struct RedisStorage { conn: Mutex, serializer: S, @@ -51,35 +54,27 @@ where { type Error = RedisStorageError<>::Error>; - // `.del().ignore()` is much more readable than `.del()\n.ignore()` - #[rustfmt::skip] fn remove_dialogue( self: Arc, chat_id: i64, - ) -> BoxFuture<'static, Result, Self::Error>> { + ) -> BoxFuture<'static, Result<(), Self::Error>> { Box::pin(async move { - let res = redis::pipe() + let deleted_rows_count = redis::pipe() .atomic() - .get(chat_id) - .del(chat_id).ignore() - .query_async::<_, redis::Value>( - self.conn.lock().await.deref_mut(), - ) + .del(chat_id) + .query_async::<_, redis::Value>(self.conn.lock().await.deref_mut()) .await?; - // We're expecting `.pipe()` to return us an exactly one result in - // bulk, so all other branches should be unreachable - match res { - redis::Value::Bulk(bulk) if bulk.len() == 1 => { - Ok(Option::>::from_redis_value(&bulk[0])? - .map(|v| { - self.serializer - .deserialize(&v) - .map_err(RedisStorageError::SerdeError) - }) - .transpose()?) + + if let redis::Value::Bulk(values) = deleted_rows_count { + if let redis::Value::Int(deleted_rows_count) = values[0] { + match deleted_rows_count { + 0 => return Err(RedisStorageError::DialogueNotFound), + _ => return Ok(()), + } } - _ => unreachable!(), } + + unreachable!("Must return redis::Value::Bulk(redis::Value::Int(_))"); }) } @@ -87,14 +82,24 @@ where self: Arc, chat_id: i64, dialogue: D, - ) -> BoxFuture<'static, Result, Self::Error>> { + ) -> BoxFuture<'static, Result<(), Self::Error>> { Box::pin(async move { let dialogue = self.serializer.serialize(&dialogue).map_err(RedisStorageError::SerdeError)?; + self.conn.lock().await.set::<_, Vec, _>(chat_id, dialogue).await?; + Ok(()) + }) + } + + fn get_dialogue( + self: Arc, + chat_id: i64, + ) -> BoxFuture<'static, Result, Self::Error>> { + Box::pin(async move { self.conn .lock() .await - .getset::<_, Vec, Option>>(chat_id, dialogue) + .get::<_, Option>>(chat_id) .await? .map(|d| self.serializer.deserialize(&d).map_err(RedisStorageError::SerdeError)) .transpose() diff --git a/src/dispatching/dialogue/storage/serializer.rs b/src/dispatching/dialogue/storage/serializer.rs index 2e0aa945..2fb30cbc 100644 --- a/src/dispatching/dialogue/storage/serializer.rs +++ b/src/dispatching/dialogue/storage/serializer.rs @@ -1,4 +1,4 @@ -//! Various serializers for memory storages. +//! Various serializers for dialogue storages. use serde::{de::DeserializeOwned, ser::Serialize}; diff --git a/src/dispatching/dialogue/storage/sqlite_storage.rs b/src/dispatching/dialogue/storage/sqlite_storage.rs index f4e4d98c..a562b5e5 100644 --- a/src/dispatching/dialogue/storage/sqlite_storage.rs +++ b/src/dispatching/dialogue/storage/sqlite_storage.rs @@ -10,15 +10,13 @@ use std::{ }; use thiserror::Error; -/// A persistent storage based on [SQLite](https://www.sqlite.org/). +/// A persistent dialogue storage based on [SQLite](https://www.sqlite.org/). pub struct SqliteStorage { pool: SqlitePool, serializer: S, } /// An error returned from [`SqliteStorage`]. -/// -/// [`SqliteStorage`]: struct.SqliteStorage.html #[derive(Debug, Error)] pub enum SqliteStorageError where @@ -26,8 +24,13 @@ where { #[error("dialogue serialization error: {0}")] SerdeError(SE), + #[error("sqlite error: {0}")] SqliteError(#[from] sqlx::Error), + + /// Returned from [`SqliteStorage::remove_dialogue`]. + #[error("row not found")] + DialogueNotFound, } impl SqliteStorage { @@ -60,23 +63,24 @@ where { type Error = SqliteStorageError<>::Error>; + /// Returns [`sqlx::Error::RowNotFound`] if a dialogue does not exist. fn remove_dialogue( self: Arc, chat_id: i64, - ) -> BoxFuture<'static, Result, Self::Error>> { + ) -> BoxFuture<'static, Result<(), Self::Error>> { Box::pin(async move { - Ok(match get_dialogue(&self.pool, chat_id).await? { - Some(d) => { - let prev_dialogue = - self.serializer.deserialize(&d).map_err(SqliteStorageError::SerdeError)?; - sqlx::query("DELETE FROM teloxide_dialogues WHERE chat_id = ?") - .bind(chat_id) - .execute(&self.pool) - .await?; - Some(prev_dialogue) - } - _ => None, - }) + let deleted_rows_count = + sqlx::query("DELETE FROM teloxide_dialogues WHERE chat_id = ?") + .bind(chat_id) + .execute(&self.pool) + .await? + .rows_affected(); + + if deleted_rows_count == 0 { + return Err(SqliteStorageError::DialogueNotFound); + } + + Ok(()) }) } @@ -84,14 +88,10 @@ where self: Arc, chat_id: i64, dialogue: D, - ) -> BoxFuture<'static, Result, Self::Error>> { + ) -> BoxFuture<'static, Result<(), Self::Error>> { Box::pin(async move { - let prev_dialogue = get_dialogue(&self.pool, chat_id) - .await? - .map(|d| self.serializer.deserialize(&d).map_err(SqliteStorageError::SerdeError)) - .transpose()?; - let upd_dialogue = - self.serializer.serialize(&dialogue).map_err(SqliteStorageError::SerdeError)?; + let d = self.serializer.serialize(&dialogue).map_err(SqliteStorageError::SerdeError)?; + self.pool .acquire() .await? @@ -103,28 +103,39 @@ where "#, ) .bind(chat_id) - .bind(upd_dialogue), + .bind(d), ) .await?; - Ok(prev_dialogue) + Ok(()) + }) + } + + fn get_dialogue( + self: Arc, + chat_id: i64, + ) -> BoxFuture<'static, Result, Self::Error>> { + Box::pin(async move { + get_dialogue(&self.pool, chat_id) + .await? + .map(|d| self.serializer.deserialize(&d).map_err(SqliteStorageError::SerdeError)) + .transpose() }) } } -#[derive(sqlx::FromRow)] -struct DialogueDbRow { - dialogue: Vec, -} +async fn get_dialogue(pool: &SqlitePool, chat_id: i64) -> Result>, sqlx::Error> { + #[derive(sqlx::FromRow)] + struct DialogueDbRow { + dialogue: Vec, + } -async fn get_dialogue( - pool: &SqlitePool, - chat_id: i64, -) -> Result>>, sqlx::Error> { - Ok(sqlx::query_as::<_, DialogueDbRow>( + let bytes = sqlx::query_as::<_, DialogueDbRow>( "SELECT dialogue FROM teloxide_dialogues WHERE chat_id = ?", ) .bind(chat_id) .fetch_optional(pool) .await? - .map(|r| Box::new(r.dialogue))) + .map(|r| r.dialogue); + + Ok(bytes) } diff --git a/src/dispatching/dialogue/storage/trace_storage.rs b/src/dispatching/dialogue/storage/trace_storage.rs index 2e28263f..f4e22d8e 100644 --- a/src/dispatching/dialogue/storage/trace_storage.rs +++ b/src/dispatching/dialogue/storage/trace_storage.rs @@ -5,14 +5,13 @@ use std::{ }; use futures::future::BoxFuture; -use log::{log_enabled, trace, Level::Trace}; use crate::dispatching::dialogue::Storage; -/// Storage wrapper for logging purposes +/// A dialogue storage wrapper which logs all actions performed on an underlying +/// storage. /// -/// Reports about any dialogue update or removal action on `trace` level -/// using `log` crate. +/// Reports about any dialogue action via [`log::Level::Trace`]. pub struct TraceStorage { inner: Arc, } @@ -35,14 +34,11 @@ where { type Error = >::Error; - fn remove_dialogue( - self: Arc, - chat_id: i64, - ) -> BoxFuture<'static, Result, Self::Error>> + fn remove_dialogue(self: Arc, chat_id: i64) -> BoxFuture<'static, Result<(), Self::Error>> where D: Send + 'static, { - trace!("Removing dialogue with {}", chat_id); + log::trace!("Removing dialogue #{}", chat_id); >::remove_dialogue(self.inner.clone(), chat_id) } @@ -50,21 +46,23 @@ where self: Arc, chat_id: i64, dialogue: D, - ) -> BoxFuture<'static, Result, Self::Error>> + ) -> BoxFuture<'static, Result<(), Self::Error>> where D: Send + 'static, { - if log_enabled!(Trace) { - Box::pin(async move { - let to = format!("{:#?}", dialogue); - let from = - >::update_dialogue(self.inner.clone(), chat_id, dialogue) - .await?; - trace!("Updated dialogue with {}, {:#?} -> {}", chat_id, from, to); - Ok(from) - }) - } else { - >::update_dialogue(self.inner.clone(), chat_id, dialogue) - } + Box::pin(async move { + let to = format!("{:#?}", dialogue); + >::update_dialogue(self.inner.clone(), chat_id, dialogue).await?; + log::trace!("Updated a dialogue #{}: {:#?}", chat_id, to); + Ok(()) + }) + } + + fn get_dialogue( + self: Arc, + chat_id: i64, + ) -> BoxFuture<'static, Result, Self::Error>> { + log::trace!("Requested a dialogue #{}", chat_id); + >::get_dialogue(self.inner.clone(), chat_id) } } diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index e7635ebb..ff603c05 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -1,48 +1,37 @@ +use std::{ + fmt::{self, Debug}, + sync::{ + atomic::{AtomicU8, Ordering}, + Arc, + }, + time::Duration, +}; + use crate::{ dispatching::{ - update_listeners, update_listeners::UpdateListener, DispatcherHandler, UpdateWithCx, + stop_token::StopToken, + update_listeners::{self, UpdateListener}, + DispatcherHandler, UpdateWithCx, }, error_handlers::{ErrorHandler, LoggingErrorHandler}, }; -use futures::StreamExt; -use std::{fmt::Debug, sync::Arc}; + +use futures::{stream::FuturesUnordered, Future, StreamExt}; use teloxide_core::{ requests::Requester, types::{ - CallbackQuery, ChatMemberUpdated, ChosenInlineResult, InlineQuery, Message, Poll, - PollAnswer, PreCheckoutQuery, ShippingQuery, UpdateKind, + AllowedUpdate, CallbackQuery, ChatMemberUpdated, ChosenInlineResult, InlineQuery, Message, + Poll, PollAnswer, PreCheckoutQuery, ShippingQuery, Update, UpdateKind, }, }; -use tokio::sync::mpsc; +use tokio::{ + sync::{mpsc, Notify}, + task::JoinHandle, + time::timeout, +}; type Tx = Option>>; -#[macro_use] -mod macros { - /// Pushes an update to a queue. - macro_rules! send { - ($requester:expr, $tx:expr, $update:expr, $variant:expr) => { - send($requester, $tx, $update, stringify!($variant)); - }; - } -} - -fn send<'a, R, Upd>(requester: &'a R, tx: &'a Tx, update: Upd, variant: &'static str) -where - Upd: Debug, - R: Requester + Clone, -{ - if let Some(tx) = tx { - if let Err(error) = tx.send(UpdateWithCx { requester: requester.clone(), update }) { - log::error!( - "The RX part of the {} channel is closed, but an update is received.\nError:{}\n", - variant, - error - ); - } - } -} - /// One dispatcher to rule them all. /// /// See the [module-level documentation](crate::dispatching) for the design @@ -63,6 +52,11 @@ pub struct Dispatcher { poll_answers_queue: Tx, my_chat_members_queue: Tx, chat_members_queue: Tx, + + running_handlers: FuturesUnordered>, + + state: Arc, + shutdown_notify_back: Arc, } impl Dispatcher @@ -87,25 +81,48 @@ where poll_answers_queue: None, my_chat_members_queue: None, chat_members_queue: None, + running_handlers: FuturesUnordered::new(), + state: <_>::default(), + shutdown_notify_back: <_>::default(), } } #[must_use] - #[allow(clippy::unnecessary_wraps)] - fn new_tx(&self, h: H) -> Tx + fn new_tx(&mut self, h: H) -> Tx where H: DispatcherHandler + Send + 'static, Upd: Send + 'static, R: Send + 'static, { let (tx, rx) = mpsc::unbounded_channel(); - tokio::spawn(async move { - let fut = h.handle(rx); - fut.await; - }); + let join_handle = tokio::spawn(h.handle(rx)); + + self.running_handlers.push(join_handle); + Some(tx) } + /// Setup the `^C` handler which [`shutdown`]s dispatching. + /// + /// [`shutdown`]: ShutdownToken::shutdown + #[cfg(feature = "ctrlc_handler")] + #[cfg_attr(docsrs, doc(cfg(feature = "ctrlc_handler")))] + pub fn setup_ctrlc_handler(self) -> Self { + let state = Arc::clone(&self.state); + tokio::spawn(async move { + loop { + tokio::signal::ctrl_c().await.expect("Failed to listen for ^C"); + + log::info!("^C received, trying to shutdown the dispatcher..."); + + // If dispatcher wasn't running, then there is nothing to do + shutdown_inner(&state).ok(); + } + }); + + self + } + #[must_use] pub fn messages_handler(mut self, h: H) -> Self where @@ -227,23 +244,39 @@ where /// /// The default parameters are a long polling update listener and log all /// errors produced by this listener). - pub async fn dispatch(&self) + /// + /// Please note that after shutting down (either because of [`shutdown`], + /// [a ctrlc signal], or [`UpdateListener`] returning `None`) all handlers + /// will be gone. As such, to restart listening you need to re-add + /// handlers. + /// + /// [`shutdown`]: ShutdownToken::shutdown + /// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler + pub async fn dispatch(&mut self) where R: Requester + Clone, ::GetUpdatesFaultTolerant: Send, { - self.dispatch_with_listener( - update_listeners::polling_default(self.requester.clone()), - LoggingErrorHandler::with_custom_text("An error from the update listener"), - ) - .await; + let listener = update_listeners::polling_default(self.requester.clone()).await; + let error_handler = + LoggingErrorHandler::with_custom_text("An error from the update listener"); + + self.dispatch_with_listener(listener, error_handler).await; } /// Starts your bot with custom `update_listener` and /// `update_listener_error_handler`. + /// + /// Please note that after shutting down (either because of [`shutdown`], + /// [a ctrlc signal], or [`UpdateListener`] returning `None`) all handlers + /// will be gone. As such, to restart listening you need to re-add + /// handlers. + /// + /// [`shutdown`]: ShutdownToken::shutdown + /// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>( - &'a self, - update_listener: UListener, + &'a mut self, + mut update_listener: UListener, update_listener_error_handler: Arc, ) where UListener: UpdateListener + 'a, @@ -251,126 +284,365 @@ where ListenerE: Debug, R: Requester + Clone, { - let update_listener = Box::pin(update_listener); + use ShutdownState::*; - update_listener - .for_each(move |update| { - let update_listener_error_handler = Arc::clone(&update_listener_error_handler); + self.hint_allowed_updates(&mut update_listener); - async move { - log::trace!("Dispatcher received an update: {:?}", update); + let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener); + let mut stop_token = Some(update_listener.stop_token()); - let update = match update { - Ok(update) => update, - Err(error) => { - Arc::clone(&update_listener_error_handler).handle_error(error).await; - return; - } - }; + if let Err(actual) = self.state.compare_exchange(Idle, Running) { + unreachable!( + "Dispatching is already running: expected `{:?}` state, found `{:?}`", + Idle, actual + ); + } - match update.kind { - UpdateKind::Message(message) => { - send!( - &self.requester, - &self.messages_queue, - message, - UpdateKind::Message - ); - } - UpdateKind::EditedMessage(message) => { - send!( - &self.requester, - &self.edited_messages_queue, - message, - UpdateKind::EditedMessage - ); - } - UpdateKind::ChannelPost(post) => { - send!( - &self.requester, - &self.channel_posts_queue, - post, - UpdateKind::ChannelPost - ); - } - UpdateKind::EditedChannelPost(post) => { - send!( - &self.requester, - &self.edited_channel_posts_queue, - post, - UpdateKind::EditedChannelPost - ); - } - UpdateKind::InlineQuery(query) => { - send!( - &self.requester, - &self.inline_queries_queue, - query, - UpdateKind::InlineQuery - ); - } - UpdateKind::ChosenInlineResult(result) => { - send!( - &self.requester, - &self.chosen_inline_results_queue, - result, - UpdateKind::ChosenInlineResult - ); - } - UpdateKind::CallbackQuery(query) => { - send!( - &self.requester, - &self.callback_queries_queue, - query, - UpdateKind::CallbackQuer - ); - } - UpdateKind::ShippingQuery(query) => { - send!( - &self.requester, - &self.shipping_queries_queue, - query, - UpdateKind::ShippingQuery - ); - } - UpdateKind::PreCheckoutQuery(query) => { - send!( - &self.requester, - &self.pre_checkout_queries_queue, - query, - UpdateKind::PreCheckoutQuery - ); - } - UpdateKind::Poll(poll) => { - send!(&self.requester, &self.polls_queue, poll, UpdateKind::Poll); - } - UpdateKind::PollAnswer(answer) => { - send!( - &self.requester, - &self.poll_answers_queue, - answer, - UpdateKind::PollAnswer - ); - } - UpdateKind::MyChatMember(chat_member_updated) => { - send!( - &self.requester, - &self.my_chat_members_queue, - chat_member_updated, - UpdateKind::MyChatMember - ); - } - UpdateKind::ChatMember(chat_member_updated) => { - send!( - &self.requester, - &self.chat_members_queue, - chat_member_updated, - UpdateKind::MyChatMember - ); - } + { + let stream = update_listener.as_stream(); + tokio::pin!(stream); + + loop { + if let Ok(upd) = timeout(shutdown_check_timeout, stream.next()).await { + match upd { + None => break, + Some(upd) => self.process_update(upd, &update_listener_error_handler).await, } } - }) - .await + + if let ShuttingDown = self.state.load() { + if let Some(token) = stop_token.take() { + log::debug!("Start shutting down dispatching..."); + token.stop(); + } + } + } + } + + self.wait_for_handlers().await; + + if let ShuttingDown = self.state.load() { + // Stopped because of a `shutdown` call. + + // Notify `shutdown`s that we finished + self.shutdown_notify_back.notify_waiters(); + log::info!("Dispatching has been shut down."); + } else { + log::info!("Dispatching has been stopped (listener returned `None`)."); + } + + self.state.store(Idle); + } + + /// Returns a shutdown token, which can later be used to shutdown + /// dispatching. + pub fn shutdown_token(&self) -> ShutdownToken { + ShutdownToken { + dispatcher_state: Arc::clone(&self.state), + shutdown_notify_back: Arc::clone(&self.shutdown_notify_back), + } + } + + async fn process_update( + &self, + update: Result, + update_listener_error_handler: &Arc, + ) where + R: Requester + Clone, + Eh: ErrorHandler, + ListenerE: Debug, + { + { + log::trace!("Dispatcher received an update: {:?}", update); + + let update = match update { + Ok(update) => update, + Err(error) => { + Arc::clone(update_listener_error_handler).handle_error(error).await; + return; + } + }; + + match update.kind { + UpdateKind::Message(message) => { + send(&self.requester, &self.messages_queue, message, "UpdateKind::Message") + } + UpdateKind::EditedMessage(message) => send( + &self.requester, + &self.edited_messages_queue, + message, + "UpdateKind::EditedMessage", + ), + UpdateKind::ChannelPost(post) => send( + &self.requester, + &self.channel_posts_queue, + post, + "UpdateKind::ChannelPost", + ), + UpdateKind::EditedChannelPost(post) => send( + &self.requester, + &self.edited_channel_posts_queue, + post, + "UpdateKind::EditedChannelPost", + ), + UpdateKind::InlineQuery(query) => send( + &self.requester, + &self.inline_queries_queue, + query, + "UpdateKind::InlineQuery", + ), + UpdateKind::ChosenInlineResult(result) => send( + &self.requester, + &self.chosen_inline_results_queue, + result, + "UpdateKind::ChosenInlineResult", + ), + UpdateKind::CallbackQuery(query) => send( + &self.requester, + &self.callback_queries_queue, + query, + "UpdateKind::CallbackQuer", + ), + UpdateKind::ShippingQuery(query) => send( + &self.requester, + &self.shipping_queries_queue, + query, + "UpdateKind::ShippingQuery", + ), + UpdateKind::PreCheckoutQuery(query) => send( + &self.requester, + &self.pre_checkout_queries_queue, + query, + "UpdateKind::PreCheckoutQuery", + ), + UpdateKind::Poll(poll) => { + send(&self.requester, &self.polls_queue, poll, "UpdateKind::Poll") + } + UpdateKind::PollAnswer(answer) => send( + &self.requester, + &self.poll_answers_queue, + answer, + "UpdateKind::PollAnswer", + ), + UpdateKind::MyChatMember(chat_member_updated) => send( + &self.requester, + &self.my_chat_members_queue, + chat_member_updated, + "UpdateKind::MyChatMember", + ), + UpdateKind::ChatMember(chat_member_updated) => send( + &self.requester, + &self.chat_members_queue, + chat_member_updated, + "UpdateKind::MyChatMember", + ), + } + } + } + + fn hint_allowed_updates(&self, listener: &mut impl UpdateListener) { + fn hint_handler_allowed_update( + queue: &Option, + kind: AllowedUpdate, + ) -> std::option::IntoIter { + queue.as_ref().map(|_| kind).into_iter() + } + + let mut allowed = hint_handler_allowed_update(&self.messages_queue, AllowedUpdate::Message) + .chain(hint_handler_allowed_update( + &self.edited_messages_queue, + AllowedUpdate::EditedMessage, + )) + .chain(hint_handler_allowed_update( + &self.channel_posts_queue, + AllowedUpdate::ChannelPost, + )) + .chain(hint_handler_allowed_update( + &self.edited_channel_posts_queue, + AllowedUpdate::EditedChannelPost, + )) + .chain(hint_handler_allowed_update( + &self.inline_queries_queue, + AllowedUpdate::InlineQuery, + )) + .chain(hint_handler_allowed_update( + &self.chosen_inline_results_queue, + AllowedUpdate::ChosenInlineResult, + )) + .chain(hint_handler_allowed_update( + &self.callback_queries_queue, + AllowedUpdate::CallbackQuery, + )) + .chain(hint_handler_allowed_update( + &self.shipping_queries_queue, + AllowedUpdate::ShippingQuery, + )) + .chain(hint_handler_allowed_update( + &self.pre_checkout_queries_queue, + AllowedUpdate::PreCheckoutQuery, + )) + .chain(hint_handler_allowed_update(&self.polls_queue, AllowedUpdate::Poll)) + .chain(hint_handler_allowed_update(&self.poll_answers_queue, AllowedUpdate::PollAnswer)) + .chain(hint_handler_allowed_update( + &self.my_chat_members_queue, + AllowedUpdate::MyChatMember, + )) + .chain(hint_handler_allowed_update( + &self.chat_members_queue, + AllowedUpdate::ChatMember, + )); + + listener.hint_allowed_updates(&mut allowed); + } + + async fn wait_for_handlers(&mut self) { + log::debug!("Waiting for handlers to finish"); + + // Drop all senders, so handlers can stop + self.messages_queue.take(); + self.edited_messages_queue.take(); + self.channel_posts_queue.take(); + self.edited_channel_posts_queue.take(); + self.inline_queries_queue.take(); + self.chosen_inline_results_queue.take(); + self.callback_queries_queue.take(); + self.shipping_queries_queue.take(); + self.pre_checkout_queries_queue.take(); + self.polls_queue.take(); + self.poll_answers_queue.take(); + self.my_chat_members_queue.take(); + self.chat_members_queue.take(); + + // Wait untill all handlers finish + self.running_handlers.by_ref().for_each(|_| async {}).await; + } +} + +/// This error is returned from [`ShutdownToken::shutdown`] when trying to +/// shutdown an idle [`Dispatcher`]. +#[derive(Debug)] +pub struct IdleShutdownError; + +impl fmt::Display for IdleShutdownError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Dispatcher was idle and as such couldn't be shut down") + } +} + +impl std::error::Error for IdleShutdownError {} + +/// A token which used to shutdown [`Dispatcher`]. +#[derive(Clone)] +pub struct ShutdownToken { + dispatcher_state: Arc, + shutdown_notify_back: Arc, +} + +impl ShutdownToken { + /// Tries to shutdown dispatching. + /// + /// Returns an error if the dispatcher is idle at the moment. + /// + /// If you don't need to wait for shutdown, the returned future can be + /// ignored. + pub fn shutdown(&self) -> Result + '_, IdleShutdownError> { + shutdown_inner(&self.dispatcher_state).map(|()| async move { + log::info!("Trying to shutdown the dispatcher..."); + self.shutdown_notify_back.notified().await + }) + } +} + +struct DispatcherState { + inner: AtomicU8, +} + +impl DispatcherState { + fn load(&self) -> ShutdownState { + ShutdownState::from_u8(self.inner.load(Ordering::SeqCst)) + } + + fn store(&self, new: ShutdownState) { + self.inner.store(new as _, Ordering::SeqCst) + } + + fn compare_exchange( + &self, + current: ShutdownState, + new: ShutdownState, + ) -> Result { + self.inner + .compare_exchange(current as _, new as _, Ordering::SeqCst, Ordering::SeqCst) + .map(ShutdownState::from_u8) + .map_err(ShutdownState::from_u8) + } +} + +impl Default for DispatcherState { + fn default() -> Self { + Self { inner: AtomicU8::new(ShutdownState::Idle as _) } + } +} + +#[repr(u8)] +#[derive(Debug)] +enum ShutdownState { + Running, + ShuttingDown, + Idle, +} + +impl ShutdownState { + fn from_u8(n: u8) -> Self { + const RUNNING: u8 = ShutdownState::Running as u8; + const SHUTTING_DOWN: u8 = ShutdownState::ShuttingDown as u8; + const IDLE: u8 = ShutdownState::Idle as u8; + + match n { + RUNNING => ShutdownState::Running, + SHUTTING_DOWN => ShutdownState::ShuttingDown, + IDLE => ShutdownState::Idle, + _ => unreachable!(), + } + } +} + +fn shutdown_check_timeout_for(update_listener: &impl UpdateListener) -> Duration { + const MIN_SHUTDOWN_CHECK_TIMEOUT: Duration = Duration::from_secs(1); + + // FIXME: replace this by just Duration::ZERO once 1.53 will be released + const DZERO: Duration = Duration::from_secs(0); + + let shutdown_check_timeout = update_listener.timeout_hint().unwrap_or(DZERO); + + // FIXME: replace this by just saturating_add once 1.53 will be released + shutdown_check_timeout.checked_add(MIN_SHUTDOWN_CHECK_TIMEOUT).unwrap_or(shutdown_check_timeout) +} + +fn shutdown_inner(state: &DispatcherState) -> Result<(), IdleShutdownError> { + use ShutdownState::*; + + let res = state.compare_exchange(Running, ShuttingDown); + + match res { + Ok(_) | Err(ShuttingDown) => Ok(()), + Err(Idle) => Err(IdleShutdownError), + Err(Running) => unreachable!(), + } +} + +fn send<'a, R, Upd>(requester: &'a R, tx: &'a Tx, update: Upd, variant: &'static str) +where + Upd: Debug, + R: Requester + Clone, +{ + if let Some(tx) = tx { + if let Err(error) = tx.send(UpdateWithCx { requester: requester.clone(), update }) { + log::error!( + "The RX part of the {} channel is closed, but an update is received.\nError:{}\n", + variant, + error + ); + } } } diff --git a/src/dispatching/mod.rs b/src/dispatching/mod.rs index 651cae53..9936de96 100644 --- a/src/dispatching/mod.rs +++ b/src/dispatching/mod.rs @@ -27,7 +27,7 @@ //! that: //! - You are able to supply [`DialogueDispatcher`] as a handler. //! - You are able to supply functions that accept -//! [`tokio::sync::mpsc::UnboundedReceiver`] and return `Future` //! as a handler. //! //! Since they implement [`DispatcherHandler`] too. @@ -46,14 +46,17 @@ //! [examples/dialogue_bot]: https://github.com/teloxide/teloxide/tree/master/examples/dialogue_bot pub mod dialogue; +pub mod stop_token; +pub mod update_listeners; + +pub(crate) mod repls; + mod dispatcher; mod dispatcher_handler; mod dispatcher_handler_rx_ext; -pub(crate) mod repls; -pub mod update_listeners; mod update_with_cx; -pub use dispatcher::Dispatcher; +pub use dispatcher::{Dispatcher, IdleShutdownError, ShutdownToken}; pub use dispatcher_handler::DispatcherHandler; pub use dispatcher_handler_rx_ext::DispatcherHandlerRxExt; use tokio::sync::mpsc::UnboundedReceiver; diff --git a/src/dispatching/repls/commands_repl.rs b/src/dispatching/repls/commands_repl.rs index 249d4861..a3625910 100644 --- a/src/dispatching/repls/commands_repl.rs +++ b/src/dispatching/repls/commands_repl.rs @@ -22,6 +22,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; /// /// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop /// [`Dispatcher`]: crate::dispatching::Dispatcher +#[cfg(feature = "ctrlc_handler")] pub async fn commands_repl(requester: R, bot_name: N, handler: H) where Cmd: BotCommand + Send + 'static, @@ -39,7 +40,7 @@ where requester, bot_name, handler, - update_listeners::polling_default(cloned_requester), + update_listeners::polling_default(cloned_requester).await, ) .await; } @@ -56,6 +57,7 @@ where /// [`Dispatcher`]: crate::dispatching::Dispatcher /// [`commands_repl`]: crate::dispatching::repls::commands_repl() /// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener +#[cfg(feature = "ctrlc_handler")] pub async fn commands_repl_with_listener<'a, R, Cmd, H, Fut, L, ListenerE, HandlerE, N>( requester: R, bot_name: N, @@ -87,6 +89,7 @@ pub async fn commands_repl_with_listener<'a, R, Cmd, H, Fut, L, ListenerE, Handl }, ) }) + .setup_ctrlc_handler() .dispatch_with_listener( listener, LoggingErrorHandler::with_custom_text("An error from the update listener"), diff --git a/src/dispatching/repls/dialogues_repl.rs b/src/dispatching/repls/dialogues_repl.rs index 706d26a1..755d4dd3 100644 --- a/src/dispatching/repls/dialogues_repl.rs +++ b/src/dispatching/repls/dialogues_repl.rs @@ -1,13 +1,13 @@ use crate::{ dispatching::{ - dialogue::{DialogueDispatcher, DialogueStage, DialogueWithCx}, + dialogue::{DialogueDispatcher, DialogueStage, DialogueWithCx, InMemStorageError}, update_listeners, update_listeners::UpdateListener, Dispatcher, UpdateWithCx, }, error_handlers::LoggingErrorHandler, }; -use std::{convert::Infallible, fmt::Debug, future::Future, sync::Arc}; +use std::{fmt::Debug, future::Future, sync::Arc}; use teloxide_core::{requests::Requester, types::Message}; /// A [REPL] for dialogues. @@ -23,10 +23,11 @@ use teloxide_core::{requests::Requester, types::Message}; /// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop /// [`Dispatcher`]: crate::dispatching::Dispatcher /// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage +#[cfg(feature = "ctrlc_handler")] pub async fn dialogues_repl<'a, R, H, D, Fut>(requester: R, handler: H) where H: Fn(UpdateWithCx, D) -> Fut + Send + Sync + 'static, - D: Default + Send + 'static, + D: Clone + Default + Send + 'static, Fut: Future> + Send + 'static, R: Requester + Send + Clone + 'static, ::GetUpdatesFaultTolerant: Send, @@ -36,7 +37,7 @@ where dialogues_repl_with_listener( requester, handler, - update_listeners::polling_default(cloned_requester), + update_listeners::polling_default(cloned_requester).await, ) .await; } @@ -55,13 +56,14 @@ where /// [`dialogues_repl`]: crate::dispatching::repls::dialogues_repl() /// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener /// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage +#[cfg(feature = "ctrlc_handler")] pub async fn dialogues_repl_with_listener<'a, R, H, D, Fut, L, ListenerE>( requester: R, handler: H, listener: L, ) where H: Fn(UpdateWithCx, D) -> Fut + Send + Sync + 'static, - D: Default + Send + 'static, + D: Clone + Default + Send + 'static, Fut: Future> + Send + 'static, L: UpdateListener + Send + 'a, ListenerE: Debug + Send + 'a, @@ -71,7 +73,12 @@ pub async fn dialogues_repl_with_listener<'a, R, H, D, Fut, L, ListenerE>( Dispatcher::new(requester) .messages_handler(DialogueDispatcher::new( - move |DialogueWithCx { cx, dialogue }: DialogueWithCx| { + move |DialogueWithCx { cx, dialogue }: DialogueWithCx< + R, + Message, + D, + InMemStorageError, + >| { let handler = Arc::clone(&handler); async move { @@ -80,6 +87,7 @@ pub async fn dialogues_repl_with_listener<'a, R, H, D, Fut, L, ListenerE>( } }, )) + .setup_ctrlc_handler() .dispatch_with_listener( listener, LoggingErrorHandler::with_custom_text("An error from the update listener"), diff --git a/src/dispatching/repls/repl.rs b/src/dispatching/repls/repl.rs index 31075f60..a298e768 100644 --- a/src/dispatching/repls/repl.rs +++ b/src/dispatching/repls/repl.rs @@ -21,6 +21,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; /// /// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop /// [`Dispatcher`]: crate::dispatching::Dispatcher +#[cfg(feature = "ctrlc_handler")] pub async fn repl(requester: R, handler: H) where H: Fn(UpdateWithCx) -> Fut + Send + Sync + 'static, @@ -31,8 +32,12 @@ where ::GetUpdatesFaultTolerant: Send, { let cloned_requester = requester.clone(); - repl_with_listener(requester, handler, update_listeners::polling_default(cloned_requester)) - .await; + repl_with_listener( + requester, + handler, + update_listeners::polling_default(cloned_requester).await, + ) + .await; } /// Like [`repl`], but with a custom [`UpdateListener`]. @@ -47,6 +52,7 @@ where /// [`Dispatcher`]: crate::dispatching::Dispatcher /// [`repl`]: crate::dispatching::repls::repl() /// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener +#[cfg(feature = "ctrlc_handler")] pub async fn repl_with_listener<'a, R, H, Fut, E, L, ListenerE>( requester: R, handler: H, @@ -72,6 +78,7 @@ pub async fn repl_with_listener<'a, R, H, Fut, E, L, ListenerE>( } }) }) + .setup_ctrlc_handler() .dispatch_with_listener( listener, LoggingErrorHandler::with_custom_text("An error from the update listener"), diff --git a/src/dispatching/stop_token.rs b/src/dispatching/stop_token.rs new file mode 100644 index 00000000..4198d4c0 --- /dev/null +++ b/src/dispatching/stop_token.rs @@ -0,0 +1,76 @@ +//! A stop token used to stop a listener. + +use std::{future::Future, pin::Pin, task}; + +use futures::future::{pending, AbortHandle, Abortable, Pending}; + +/// A stop token allows you to stop a listener. +/// +/// See also: [`UpdateListener::stop_token`]. +/// +/// [`UpdateListener::stop_token`]: +/// crate::dispatching::update_listeners::UpdateListener::stop_token +pub trait StopToken { + /// Stop the listener linked to this token. + fn stop(self); +} + +/// A stop token which does nothing. May be used in prototyping or in cases +/// where you do not care about graceful shutdowning. +pub struct Noop; + +impl StopToken for Noop { + fn stop(self) {} +} + +/// A stop token which corresponds to [`AsyncStopFlag`]. +#[derive(Clone)] +pub struct AsyncStopToken(AbortHandle); + +/// A flag which corresponds to [`AsyncStopToken`]. +/// +/// To know if the stop token was used you can either repeatedly call +/// [`is_stopped`] or use this type as a `Future`. +/// +/// [`is_stopped`]: AsyncStopFlag::is_stopped +#[pin_project::pin_project] +pub struct AsyncStopFlag(#[pin] Abortable>); + +impl AsyncStopToken { + /// Create a new token/flag pair. + pub fn new_pair() -> (Self, AsyncStopFlag) { + let (handle, reg) = AbortHandle::new_pair(); + let token = Self(handle); + let flag = AsyncStopFlag(Abortable::new(pending(), reg)); + + (token, flag) + } +} + +impl StopToken for AsyncStopToken { + fn stop(self) { + self.0.abort() + } +} + +impl AsyncStopFlag { + /// Returns true if the stop token linked to `self` was used. + pub fn is_stopped(&self) -> bool { + self.0.is_aborted() + } +} + +/// This future resolves when a stop token was used. +impl Future for AsyncStopFlag { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { + self.project().0.poll(cx).map(|res| { + debug_assert!( + res.is_err(), + "Pending Future can't ever be resolved, so Abortable is only resolved when \ + canceled" + ); + }) + } +} diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index a3ec5097..a0d9edca 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -96,107 +96,102 @@ //! //! [`UpdateListener`]: UpdateListener //! [`polling_default`]: polling_default -//! [`polling`]: polling +//! [`polling`]: polling() //! [`Box::get_updates`]: crate::requests::Requester::get_updates //! [getting updates]: https://core.telegram.org/bots/api#getting-updates //! [long]: https://en.wikipedia.org/wiki/Push_technology#Long_polling //! [short]: https://en.wikipedia.org/wiki/Polling_(computer_science) //! [webhook]: https://en.wikipedia.org/wiki/Webhook -use futures::{stream, Stream, StreamExt}; +use futures::Stream; -use std::{convert::TryInto, time::Duration}; -use teloxide_core::{ - requests::{HasPayload, Request, Requester}, - types::{AllowedUpdate, SemiparsedVec, Update}, +use std::time::Duration; + +use crate::{ + dispatching::stop_token::StopToken, + types::{AllowedUpdate, Update}, }; -/// A generic update listener. -pub trait UpdateListener: Stream> { - // TODO: add some methods here (.shutdown(), etc). -} -impl UpdateListener for S where S: Stream> {} +mod polling; +mod stateful_listener; -/// Returns a long polling update listener with `timeout` of 10 seconds. +pub use self::{ + polling::{polling, polling_default}, + stateful_listener::StatefulListener, +}; + +/// An update listener. /// -/// See also: [`polling`](polling). -pub fn polling_default(requester: R) -> impl UpdateListener -where - R: Requester, - ::GetUpdatesFaultTolerant: Send, -{ - polling(requester, Some(Duration::from_secs(10)), None, None) +/// Implementors of this trait allow getting updates from Telegram. +/// +/// Currently Telegram has 2 ways of getting updates -- [polling] and +/// [webhooks]. Currently, only the former one is implemented (see [`polling()`] +/// and [`polling_default`]) +/// +/// Some functions of this trait are located in the supertrait +/// ([`AsUpdateStream`]), see also: +/// - [`AsUpdateStream::Stream`] +/// - [`AsUpdateStream::as_stream`] +/// +/// [polling]: self#long-polling +/// [webhooks]: self#webhooks +pub trait UpdateListener: for<'a> AsUpdateStream<'a, E> { + /// The type of token which allows to stop this listener. + type StopToken: StopToken; + + /// Returns a token which stops this listener. + /// + /// The [`stop`] function of the token is not guaranteed to have an + /// immediate effect. That is, some listeners can return updates even + /// after [`stop`] is called (e.g.: because of buffering). + /// + /// [`stop`]: StopToken::stop + /// + /// Implementors of this function are encouraged to stop listening for + /// updates as soon as possible and return `None` from the update stream as + /// soon as all cached updates are returned. + #[must_use = "This function doesn't stop listening, to stop listening you need to call stop on \ + the returned token"] + fn stop_token(&mut self) -> Self::StopToken; + + /// Hint which updates should the listener listen for. + /// + /// For example [`polling()`] should send the hint as + /// [`GetUpdates::allowed_updates`] + /// + /// Note however that this is a _hint_ and as such, it can be ignored. The + /// listener is not guaranteed to only return updates which types are listed + /// in the hint. + /// + /// [`GetUpdates::allowed_updates`]: + /// crate::payloads::GetUpdates::allowed_updates + fn hint_allowed_updates(&mut self, hint: &mut dyn Iterator) { + let _ = hint; + } + + /// The timeout duration hint. + /// + /// This hints how often dispatcher should check for a shutdown. E.g., for + /// [`polling()`] this returns the [`timeout`]. + /// + /// [`timeout`]: crate::payloads::GetUpdates::timeout + /// + /// If you are implementing this trait and not sure what to return from this + /// function, just leave it with the default implementation. + fn timeout_hint(&self) -> Option { + None + } } -/// Returns a long/short polling update listener with some additional options. +/// [`UpdateListener`]'s supertrait/extension. /// -/// - `bot`: Using this bot, the returned update listener will receive updates. -/// - `timeout`: A timeout for polling. -/// - `limit`: Limits the number of updates to be retrieved at once. Values -/// between 1—100 are accepted. -/// - `allowed_updates`: A list the types of updates you want to receive. -/// See [`GetUpdates`] for defaults. -/// -/// See also: [`polling_default`](polling_default). -/// -/// [`GetUpdates`]: crate::payloads::GetUpdates -pub fn polling( - requester: R, - timeout: Option, - limit: Option, - allowed_updates: Option>, -) -> impl UpdateListener -where - R: Requester, - ::GetUpdatesFaultTolerant: Send, -{ - let timeout = timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")); +/// This trait is a workaround to not require GAT. +pub trait AsUpdateStream<'a, E> { + /// The stream of updates from Telegram. + type Stream: Stream> + 'a; - stream::unfold( - (allowed_updates, requester, 0), - move |(mut allowed_updates, bot, mut offset)| async move { - let mut req = bot.get_updates_fault_tolerant(); - let payload = &mut req.payload_mut().0; - payload.offset = Some(offset); - payload.timeout = timeout; - payload.limit = limit; - payload.allowed_updates = allowed_updates.take(); - - let updates = match req.send().await { - Err(err) => vec![Err(err)], - Ok(SemiparsedVec(updates)) => { - // Set offset to the last update's id + 1 - if let Some(upd) = updates.last() { - let id: i32 = match upd { - Ok(ok) => ok.id, - Err((value, _)) => value["update_id"] - .as_i64() - .expect("The 'update_id' field must always exist in Update") - .try_into() - .expect("update_id must be i32"), - }; - - offset = id + 1; - } - - for update in &updates { - if let Err((value, e)) = update { - log::error!( - "Cannot parse an update.\nError: {:?}\nValue: {}\n\ - This is a bug in teloxide-core, please open an issue here: \ - https://github.com/teloxide/teloxide-core/issues.", - e, - value - ); - } - } - - updates.into_iter().filter_map(Result::ok).map(Ok).collect::>() - } - }; - - Some((stream::iter(updates), (allowed_updates, bot, offset))) - }, - ) - .flatten() + /// Creates the update [`Stream`]. + /// + /// [`Stream`]: AsUpdateStream::Stream + fn as_stream(&'a mut self) -> Self::Stream; } diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs new file mode 100644 index 00000000..5440fd6a --- /dev/null +++ b/src/dispatching/update_listeners/polling.rs @@ -0,0 +1,179 @@ +use std::{convert::TryInto, time::Duration}; + +use futures::{ + future::{ready, Either}, + stream::{self, Stream, StreamExt}, +}; + +use crate::{ + dispatching::{ + stop_token::{AsyncStopFlag, AsyncStopToken}, + update_listeners::{stateful_listener::StatefulListener, UpdateListener}, + }, + payloads::GetUpdates, + requests::{HasPayload, Request, Requester}, + types::{AllowedUpdate, SemiparsedVec, Update}, +}; + +/// Returns a long polling update listener with `timeout` of 10 seconds. +/// +/// See also: [`polling`](polling). +/// +/// ## Notes +/// +/// This function will automatically delete a webhook if it was set up. +pub async fn polling_default(requester: R) -> impl UpdateListener +where + R: Requester + 'static, + ::GetUpdatesFaultTolerant: Send, +{ + delete_webhook_if_setup(&requester).await; + polling(requester, Some(Duration::from_secs(10)), None, None) +} + +/// Returns a long/short polling update listener with some additional options. +/// +/// - `bot`: Using this bot, the returned update listener will receive updates. +/// - `timeout`: A timeout for polling. +/// - `limit`: Limits the number of updates to be retrieved at once. Values +/// between 1—100 are accepted. +/// - `allowed_updates`: A list the types of updates you want to receive. +/// See [`GetUpdates`] for defaults. +/// +/// See also: [`polling_default`](polling_default). +/// +/// [`GetUpdates`]: crate::payloads::GetUpdates +pub fn polling( + requester: R, + timeout: Option, + limit: Option, + allowed_updates: Option>, +) -> impl UpdateListener +where + R: Requester + 'static, + ::GetUpdatesFaultTolerant: Send, +{ + struct State { + bot: B, + timeout: Option, + limit: Option, + allowed_updates: Option>, + offset: i32, + flag: AsyncStopFlag, + token: AsyncStopToken, + } + + fn stream(st: &mut State) -> impl Stream> + '_ + where + B: Requester, + { + stream::unfold(st, move |state| async move { + let State { timeout, limit, allowed_updates, bot, offset, flag, .. } = &mut *state; + + if flag.is_stopped() { + let mut req = bot.get_updates_fault_tolerant(); + + req.payload_mut().0 = GetUpdates { + offset: Some(*offset), + timeout: Some(0), + limit: Some(1), + allowed_updates: allowed_updates.take(), + }; + + return match req.send().await { + Ok(_) => None, + Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)), + }; + } + + let mut req = bot.get_updates_fault_tolerant(); + req.payload_mut().0 = GetUpdates { + offset: Some(*offset), + timeout: *timeout, + limit: *limit, + allowed_updates: allowed_updates.take(), + }; + + let updates = match req.send().await { + Err(err) => return Some((Either::Left(stream::once(ready(Err(err)))), state)), + Ok(SemiparsedVec(updates)) => { + // Set offset to the last update's id + 1 + if let Some(upd) = updates.last() { + let id: i32 = match upd { + Ok(ok) => ok.id, + Err((value, _)) => value["update_id"] + .as_i64() + .expect("The 'update_id' field must always exist in Update") + .try_into() + .expect("update_id must be i32"), + }; + + *offset = id + 1; + } + + for update in &updates { + if let Err((value, e)) = update { + log::error!( + "Cannot parse an update.\nError: {:?}\nValue: {}\n\ + This is a bug in teloxide-core, please open an issue here: \ + https://github.com/teloxide/teloxide-core/issues.", + e, + value + ); + } + } + + updates.into_iter().filter_map(Result::ok).map(Ok) + } + }; + + Some((Either::Right(stream::iter(updates)), state)) + }) + .flatten() + } + + let (token, flag) = AsyncStopToken::new_pair(); + + let state = State { + bot: requester, + timeout: timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")), + limit, + allowed_updates, + offset: 0, + flag, + token, + }; + + let stop_token = |st: &mut State<_>| st.token.clone(); + + let hint_allowed_updates = + Some(|state: &mut State<_>, allowed: &mut dyn Iterator| { + // TODO: we should probably warn if there already were different allowed updates + // before + state.allowed_updates = Some(allowed.collect()); + }); + let timeout_hint = Some(move |_: &State<_>| timeout); + + StatefulListener::new_with_hints(state, stream, stop_token, hint_allowed_updates, timeout_hint) +} + +async fn delete_webhook_if_setup(requester: &R) +where + R: Requester, +{ + let webhook_info = match requester.get_webhook_info().send().await { + Ok(ok) => ok, + Err(e) => { + log::error!("Failed to get webhook info: {:?}", e); + return; + } + }; + + let is_webhook_setup = !webhook_info.url.is_empty(); + + if is_webhook_setup { + if let Err(e) = requester.delete_webhook().send().await { + log::error!("Failed to delete a webhook: {:?}", e); + } + } +} diff --git a/src/dispatching/update_listeners/stateful_listener.rs b/src/dispatching/update_listeners/stateful_listener.rs new file mode 100644 index 00000000..a9c26576 --- /dev/null +++ b/src/dispatching/update_listeners/stateful_listener.rs @@ -0,0 +1,153 @@ +use std::time::Duration; + +use futures::Stream; + +use crate::{ + dispatching::{ + stop_token::{self, StopToken}, + update_listeners::{AsUpdateStream, UpdateListener}, + }, + types::{AllowedUpdate, Update}, +}; + +/// A listener created from functions. +/// +/// This type allows to turn a stream of updates (+ some additional functions) +/// into an [`UpdateListener`]. +/// +/// For an example of usage, see [`polling`]. +/// +/// [`polling`]: crate::dispatching::update_listeners::polling() +#[non_exhaustive] +pub struct StatefulListener { + /// The state of the listener. + pub state: St, + + /// The function used as [`AsUpdateStream::as_stream`]. + /// + /// Must be of type `for<'a> &'a mut St -> impl Stream + 'a` and callable by + /// `&mut`. + pub stream: Assf, + + /// The function used as [`UpdateListener::stop_token`]. + /// + /// Must be of type `for<'a> &'a mut St -> impl StopToken`. + pub stop_token: Sf, + + /// The function used as [`UpdateListener::hint_allowed_updates`]. + /// + /// Must be of type `for<'a, 'b> &'a mut St, &'b mut dyn Iterator -> ()`. + pub hint_allowed_updates: Option, + + /// The function used as [`UpdateListener::timeout_hint`]. + /// + /// Must be of type `for<'a> &'a St -> Option` and callable by + /// `&`. + pub timeout_hint: Option, +} + +type Haufn = for<'a, 'b> fn(&'a mut State, &'b mut dyn Iterator); +type Thfn = for<'a> fn(&'a State) -> Option; + +impl StatefulListener, Thfn> { + /// Creates a new stateful listener from its components. + pub fn new(state: St, stream: Assf, stop_token: Sf) -> Self { + Self::new_with_hints(state, stream, stop_token, None, None) + } +} + +impl StatefulListener { + /// Creates a new stateful listener from its components. + pub fn new_with_hints( + state: St, + stream: Assf, + stop_token: Sf, + hint_allowed_updates: Option, + timeout_hint: Option, + ) -> Self { + Self { state, stream, stop_token, hint_allowed_updates, timeout_hint } + } +} + +impl + StatefulListener< + S, + for<'a> fn(&'a mut S) -> &'a mut S, + for<'a> fn(&'a mut S) -> stop_token::Noop, + Haufn, + Thfn, + > +where + S: Stream> + Unpin + 'static, +{ + /// Creates a new update listener from a stream of updates which ignores + /// stop signals. + /// + /// It won't be possible to ever stop this listener with a stop token. + pub fn from_stream_without_graceful_shutdown(stream: S) -> Self { + let this = Self::new_with_hints( + stream, + |s| s, + |_| stop_token::Noop, + None, + Some(|_| { + // FIXME: replace this by just Duration::MAX once 1.53 releases + // be released + const NANOS_PER_SEC: u32 = 1_000_000_000; + let dmax = Duration::new(u64::MAX, NANOS_PER_SEC - 1); + + Some(dmax) + }), + ); + + assert_update_listener(this) + } +} + +impl<'a, St, Assf, Sf, Hauf, Thf, Strm, E> AsUpdateStream<'a, E> + for StatefulListener +where + (St, Strm): 'a, + Assf: FnMut(&'a mut St) -> Strm, + Strm: Stream>, +{ + type Stream = Strm; + + fn as_stream(&'a mut self) -> Self::Stream { + (self.stream)(&mut self.state) + } +} + +impl UpdateListener + for StatefulListener +where + Self: for<'a> AsUpdateStream<'a, E>, + Sf: FnMut(&mut St) -> Stt, + Stt: StopToken, + Hauf: FnMut(&mut St, &mut dyn Iterator), + Thf: Fn(&St) -> Option, +{ + type StopToken = Stt; + + fn stop_token(&mut self) -> Stt { + (self.stop_token)(&mut self.state) + } + + fn hint_allowed_updates(&mut self, hint: &mut dyn Iterator) { + if let Some(f) = &mut self.hint_allowed_updates { + f(&mut self.state, hint); + } + } + + fn timeout_hint(&self) -> Option { + self.timeout_hint.as_ref().and_then(|f| f(&self.state)) + } +} + +fn assert_update_listener(l: L) -> L +where + L: UpdateListener, +{ + l +} diff --git a/src/dispatching/update_with_cx.rs b/src/dispatching/update_with_cx.rs index 6ac78034..c0b60722 100644 --- a/src/dispatching/update_with_cx.rs +++ b/src/dispatching/update_with_cx.rs @@ -1,6 +1,11 @@ use crate::dispatching::dialogue::GetChatId; use teloxide_core::{ - payloads::SendMessageSetters, + payloads::{ + SendAnimationSetters, SendAudioSetters, SendContactSetters, SendDocumentSetters, + SendLocationSetters, SendMediaGroupSetters, SendMessageSetters, SendPhotoSetters, + SendStickerSetters, SendVenueSetters, SendVideoNoteSetters, SendVideoSetters, + SendVoiceSetters, + }, requests::{Request, Requester}, types::{ChatId, InputFile, InputMedia, Message}, }; @@ -64,6 +69,87 @@ where self.requester.send_message(self.chat_id(), text).reply_to_message_id(self.update.id) } + pub fn reply_audio(&self, audio: InputFile) -> R::SendAudio { + self.requester.send_audio(self.update.chat.id, audio).reply_to_message_id(self.update.id) + } + + pub fn reply_animation(&self, animation: InputFile) -> R::SendAnimation { + self.requester + .send_animation(self.update.chat.id, animation) + .reply_to_message_id(self.update.id) + } + + pub fn reply_document(&self, document: InputFile) -> R::SendDocument { + self.requester + .send_document(self.update.chat.id, document) + .reply_to_message_id(self.update.id) + } + + pub fn reply_photo(&self, photo: InputFile) -> R::SendPhoto { + self.requester.send_photo(self.update.chat.id, photo).reply_to_message_id(self.update.id) + } + + pub fn reply_video(&self, video: InputFile) -> R::SendVideo { + self.requester.send_video(self.update.chat.id, video).reply_to_message_id(self.update.id) + } + + pub fn reply_voice(&self, voice: InputFile) -> R::SendVoice { + self.requester.send_voice(self.update.chat.id, voice).reply_to_message_id(self.update.id) + } + + pub fn reply_media_group(&self, media_group: T) -> R::SendMediaGroup + where + T: IntoIterator, + { + self.requester + .send_media_group(self.update.chat.id, media_group) + .reply_to_message_id(self.update.id) + } + + pub fn reply_location(&self, latitude: f64, longitude: f64) -> R::SendLocation { + self.requester + .send_location(self.update.chat.id, latitude, longitude) + .reply_to_message_id(self.update.id) + } + + pub fn reply_venue( + &self, + latitude: f64, + longitude: f64, + title: T, + address: U, + ) -> R::SendVenue + where + T: Into, + U: Into, + { + self.requester + .send_venue(self.update.chat.id, latitude, longitude, title, address) + .reply_to_message_id(self.update.id) + } + + pub fn reply_video_note(&self, video_note: InputFile) -> R::SendVideoNote { + self.requester + .send_video_note(self.update.chat.id, video_note) + .reply_to_message_id(self.update.id) + } + + pub fn reply_contact(&self, phone_number: T, first_name: U) -> R::SendContact + where + T: Into, + U: Into, + { + self.requester + .send_contact(self.update.chat.id, phone_number, first_name) + .reply_to_message_id(self.update.id) + } + + pub fn reply_sticker(&self, sticker: InputFile) -> R::SendSticker { + self.requester + .send_sticker(self.update.chat.id, sticker) + .reply_to_message_id(self.update.id) + } + pub fn answer_photo(&self, photo: InputFile) -> R::SendPhoto { self.requester.send_photo(self.update.chat.id, photo) } diff --git a/src/features.txt b/src/features.txt new file mode 100644 index 00000000..f6ef21ba --- /dev/null +++ b/src/features.txt @@ -0,0 +1,27 @@ +## Cargo features + +| Feature | Description | +|----------|----------| +| `redis-storage` | Enables the [Redis] storage support for dialogues.| +| `sqlite-storage` | Enables the [Sqlite] storage support for dialogues. | +| `cbor-serializer` | Enables the [CBOR] serializer for dialogues. | +| `bincode-serializer` | Enables the [Bincode] serializer for dialogues. | +| `macros` | Re-exports macros from [`teloxide-macros`]. | +| `native-tls` | Enables the [`native-tls`] TLS implementation (enabled by default). | +| `rustls` | Enables the [`rustls`] TLS implementation. | +| `ctrlc_handler` | Enables the [`Dispatcher::setup_ctrlc_handler`](dispatching::Dispatcher::setup_ctrlc_handler) function. | +| `auto-send` | Enables the `AutoSend` bot adaptor. | +| `cache-me` | Enables the `CacheMe` bot adaptor. | +| `frunk` | Enables [`teloxide::utils::UpState`]. | +| `full` | Enables all the features except `nightly`. | +| `nightly` | Enables nightly-only features (see the [teloxide-core features]). | + +[Redis]: https://redis.io/ +[Sqlite]: https://www.sqlite.org/ +[CBOR]: https://en.wikipedia.org/wiki/CBOR +[Bincode]: https://github.com/servo/bincode +[`teloxide-macros`]: https://github.com/teloxide/teloxide-macros +[`native-tls`]: https://docs.rs/native-tls +[`rustls`]: https://docs.rs/rustls +[`teloxide::utils::UpState`]: utils::UpState +[teloxide-core features]: https://docs.rs/teloxide-core/latest/teloxide_core/#cargo-features diff --git a/src/lib.rs b/src/lib.rs index 5ecb64cf..d09cda25 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,6 +33,12 @@ //! [`async`/`.await`]: https://rust-lang.github.io/async-book/01_getting_started/01_chapter.html //! [Rust]: https://www.rust-lang.org/ +// This hack is used to cancel formatting for a Markdown table. See [1], [2], and [3]. +// +// [1]: https://github.com/rust-lang/rustfmt/issues/4210 +// [2]: https://github.com/rust-lang/rustfmt/issues/4787 +// [3]: https://github.com/rust-lang/rust/issues/82768#issuecomment-803935643 +#![cfg_attr(feature = "nightly", cfg_attr(feature = "nightly", doc = include_str!("features.txt")))] // https://github.com/teloxide/teloxide/raw/master/logo.svg doesn't work in html_logo_url, I don't know why. #![doc( html_logo_url = "https://github.com/teloxide/teloxide/raw/master/ICON.png", @@ -40,14 +46,19 @@ )] #![allow(clippy::match_bool)] #![forbid(unsafe_code)] -#![cfg_attr(all(feature = "nightly", doctest), feature(external_doc))] -// we pass "--cfg docsrs" when building docs to add `This is supported on feature="..." only.` +// We pass "--cfg docsrs" when building docs to add `This is supported on +// feature="..." only.` +// +// "--cfg dep_docsrs" is used for the same reason, but for `teloxide-core`. // // To properly build docs of this crate run // ```console -// $ RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --open --all-features +// $ RUSTFLAGS="--cfg dep_docsrs" RUSTDOCFLAGS="--cfg docsrs -Znormalize-docs" cargo +nightly doc --open --all-features // ``` #![cfg_attr(all(docsrs, feature = "nightly"), feature(doc_cfg))] +#![allow(clippy::redundant_pattern_matching)] +// https://github.com/rust-lang/rust-clippy/issues/7422 +#![allow(clippy::nonstandard_macro_braces)] pub use dispatching::repls::{ commands_repl, commands_repl_with_listener, dialogues_repl, dialogues_repl_with_listener, repl, @@ -73,7 +84,7 @@ pub use teloxide_macros as macros; pub use teloxide_macros::teloxide; #[cfg(all(feature = "nightly", doctest))] -#[doc(include = "../README.md")] +#[cfg_attr(feature = "nightly", cfg_attr(feature = "nightly", doc = include_str!("../README.md")))] enum ReadmeDocTests {} use teloxide_core::requests::ResponseResult; diff --git a/src/logging.rs b/src/logging.rs index a1a7e11d..fd7e5192 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -1,7 +1,7 @@ /// Enables logging through [pretty-env-logger]. /// -/// A logger will **only** print errors from teloxide and **all** logs from -/// your program. +/// A logger will **only** print errors, warnings, and general information from +/// teloxide and **all** logs from your program. /// /// # Example /// ```no_compile @@ -23,8 +23,8 @@ macro_rules! enable_logging { /// Enables logging through [pretty-env-logger] with a custom filter for your /// program. /// -/// A logger will **only** print errors from teloxide and restrict logs from -/// your program by the specified filter. +/// A logger will **only** print errors, warnings, and general information from +/// teloxide and restrict logs from your program by the specified filter. /// /// # Example /// Allow printing all logs from your program up to [`LevelFilter::Debug`] (i.e. @@ -46,7 +46,7 @@ macro_rules! enable_logging_with_filter { pretty_env_logger::formatted_builder() .write_style(pretty_env_logger::env_logger::WriteStyle::Auto) .filter(Some(&env!("CARGO_PKG_NAME").replace("-", "_")), $filter) - .filter(Some("teloxide"), log::LevelFilter::Error) + .filter(Some("teloxide"), log::LevelFilter::Info) .init(); }; } diff --git a/src/utils/command.rs b/src/utils/command.rs index 22a795e4..5dd3f852 100644 --- a/src/utils/command.rs +++ b/src/utils/command.rs @@ -76,7 +76,7 @@ pub use teloxide_macros::BotCommand; /// # } /// ``` /// -/// ## Enum attributes +/// # Enum attributes /// 1. `#[command(rename = "rule")]` /// Rename all commands by `rule`. Allowed rules are `lowercase`. If you will /// not use this attribute, commands will be parsed by their original names. @@ -93,7 +93,7 @@ pub use teloxide_macros::BotCommand; /// after the first space into the first argument, which must implement /// [`FromStr`]. /// -/// ### Example +/// ## Example /// ``` /// # #[cfg(feature = "macros")] { /// use teloxide::utils::command::BotCommand; @@ -113,7 +113,7 @@ pub use teloxide_macros::BotCommand; /// space character) and parses each part into the corresponding arguments, /// which must implement [`FromStr`]. /// -/// ### Example +/// ## Example /// ``` /// # #[cfg(feature = "macros")] { /// use teloxide::utils::command::BotCommand; @@ -133,7 +133,7 @@ pub use teloxide_macros::BotCommand; /// Specify separator used by the `split` parser. It will be ignored when /// accompanied by another type of parsers. /// -/// ### Example +/// ## Example /// ``` /// # #[cfg(feature = "macros")] { /// use teloxide::utils::command::BotCommand; @@ -149,20 +149,24 @@ pub use teloxide_macros::BotCommand; /// # } /// ``` /// -/// ## Variant attributes +/// # Variant attributes /// All variant attributes override the corresponding `enum` attributes. /// /// 1. `#[command(rename = "rule")]` /// Rename one command by a rule. Allowed rules are `lowercase`, `%some_name%`, /// where `%some_name%` is any string, a new name. /// -/// 2. `#[command(parse_with = "parser")]` +/// 2. `#[command(description = "description")]` +/// Give your command a description. Write `"off"` for `"description"` to hide a +/// command. +/// +/// 3. `#[command(parse_with = "parser")]` /// One more option is available for variants. /// - `custom_parser` - your own parser of the signature `fn(String) -> /// Result`, where `Tuple` corresponds to the variant's /// arguments. /// -/// ### Example +/// ## Example /// ``` /// # #[cfg(feature = "macros")] { /// use teloxide::utils::command::{BotCommand, ParseError}; @@ -191,11 +195,11 @@ pub use teloxide_macros::BotCommand; /// # } /// ``` /// -/// 3. `#[command(prefix = "prefix")]` -/// 4. `#[command(description = "description")]` +/// 4. `#[command(prefix = "prefix")]` /// 5. `#[command(separator = "sep")]` /// -/// Analogous to the descriptions above. +/// These attributes just override the corresponding `enum` attributes for a +/// specific variant. /// /// [`FromStr`]: https://doc.rust-lang.org/std/str/trait.FromStr.html /// [`BotCommand`]: crate::utils::command::BotCommand diff --git a/src/utils/html.rs b/src/utils/html.rs index cf22acc4..dc26a299 100644 --- a/src/utils/html.rs +++ b/src/utils/html.rs @@ -44,7 +44,7 @@ pub fn link(url: &str, text: &str) -> String { } /// Builds an inline user mention link with an anchor. -pub fn user_mention(user_id: i32, text: &str) -> String { +pub fn user_mention(user_id: i64, text: &str) -> String { link(format!("tg://user?id={}", user_id).as_str(), text) } diff --git a/src/utils/markdown.rs b/src/utils/markdown.rs index f96fafaa..d778aee2 100644 --- a/src/utils/markdown.rs +++ b/src/utils/markdown.rs @@ -59,7 +59,7 @@ pub fn link(url: &str, text: &str) -> String { } /// Builds an inline user mention link with an anchor. -pub fn user_mention(user_id: i32, text: &str) -> String { +pub fn user_mention(user_id: i64, text: &str) -> String { link(format!("tg://user?id={}", user_id).as_str(), text) } diff --git a/tests/command.rs b/tests/command.rs index deed8647..0a114911 100644 --- a/tests/command.rs +++ b/tests/command.rs @@ -1,3 +1,6 @@ +// https://github.com/rust-lang/rust-clippy/issues/7422 +#![allow(clippy::nonstandard_macro_braces)] + #[cfg(feature = "macros")] use teloxide::utils::command::{BotCommand, ParseError}; diff --git a/tests/redis.rs b/tests/redis.rs index 2b88ad8e..2e011596 100644 --- a/tests/redis.rs +++ b/tests/redis.rs @@ -1,9 +1,8 @@ use std::{ fmt::{Debug, Display}, - future::Future, sync::Arc, }; -use teloxide::dispatching::dialogue::{RedisStorage, Serializer, Storage}; +use teloxide::dispatching::dialogue::{RedisStorage, RedisStorageError, Serializer, Storage}; #[tokio::test] async fn test_redis_json() { @@ -40,32 +39,41 @@ async fn test_redis_cbor() { type Dialogue = String; +macro_rules! test_dialogues { + ($storage:expr, $_0:expr, $_1:expr, $_2:expr) => { + assert_eq!(Arc::clone(&$storage).get_dialogue(1).await.unwrap(), $_0); + assert_eq!(Arc::clone(&$storage).get_dialogue(11).await.unwrap(), $_1); + assert_eq!(Arc::clone(&$storage).get_dialogue(256).await.unwrap(), $_2); + }; +} + async fn test_redis(storage: Arc>) where S: Send + Sync + Serializer + 'static, >::Error: Debug + Display, { - check_dialogue(None, Arc::clone(&storage).update_dialogue(1, "ABC".to_owned())).await; - check_dialogue(None, Arc::clone(&storage).update_dialogue(11, "DEF".to_owned())).await; - check_dialogue(None, Arc::clone(&storage).update_dialogue(256, "GHI".to_owned())).await; + test_dialogues!(storage, None, None, None); - // 1 - ABC, 11 - DEF, 256 - GHI + Arc::clone(&storage).update_dialogue(1, "ABC".to_owned()).await.unwrap(); + Arc::clone(&storage).update_dialogue(11, "DEF".to_owned()).await.unwrap(); + Arc::clone(&storage).update_dialogue(256, "GHI".to_owned()).await.unwrap(); - check_dialogue("ABC", Arc::clone(&storage).update_dialogue(1, "JKL".to_owned())).await; - check_dialogue("GHI", Arc::clone(&storage).update_dialogue(256, "MNO".to_owned())).await; + test_dialogues!( + storage, + Some("ABC".to_owned()), + Some("DEF".to_owned()), + Some("GHI".to_owned()) + ); - // 1 - GKL, 11 - DEF, 256 - MNO + Arc::clone(&storage).remove_dialogue(1).await.unwrap(); + Arc::clone(&storage).remove_dialogue(11).await.unwrap(); + Arc::clone(&storage).remove_dialogue(256).await.unwrap(); - check_dialogue("JKL", Arc::clone(&storage).remove_dialogue(1)).await; - check_dialogue("DEF", Arc::clone(&storage).remove_dialogue(11)).await; - check_dialogue("MNO", Arc::clone(&storage).remove_dialogue(256)).await; -} - -async fn check_dialogue( - expected: impl Into>, - actual: impl Future, E>>, -) where - E: Debug, -{ - assert_eq!(expected.into().map(ToOwned::to_owned), actual.await.unwrap()) + test_dialogues!(storage, None, None, None); + + // Check that a try to remove a non-existing dialogue results in an error. + assert!(matches!( + Arc::clone(&storage).remove_dialogue(1).await.unwrap_err(), + RedisStorageError::DialogueNotFound + )); } diff --git a/tests/sqlite.rs b/tests/sqlite.rs index bf08142d..a66c745f 100644 --- a/tests/sqlite.rs +++ b/tests/sqlite.rs @@ -1,9 +1,8 @@ use std::{ fmt::{Debug, Display}, - future::Future, sync::Arc, }; -use teloxide::dispatching::dialogue::{Serializer, SqliteStorage, Storage}; +use teloxide::dispatching::dialogue::{Serializer, SqliteStorage, SqliteStorageError, Storage}; #[tokio::test(flavor = "multi_thread")] async fn test_sqlite_json() { @@ -36,32 +35,41 @@ async fn test_sqlite_cbor() { type Dialogue = String; +macro_rules! test_dialogues { + ($storage:expr, $_0:expr, $_1:expr, $_2:expr) => { + assert_eq!(Arc::clone(&$storage).get_dialogue(1).await.unwrap(), $_0); + assert_eq!(Arc::clone(&$storage).get_dialogue(11).await.unwrap(), $_1); + assert_eq!(Arc::clone(&$storage).get_dialogue(256).await.unwrap(), $_2); + }; +} + async fn test_sqlite(storage: Arc>) where S: Send + Sync + Serializer + 'static, >::Error: Debug + Display, { - check_dialogue(None, Arc::clone(&storage).update_dialogue(1, "ABC".to_owned())).await; - check_dialogue(None, Arc::clone(&storage).update_dialogue(11, "DEF".to_owned())).await; - check_dialogue(None, Arc::clone(&storage).update_dialogue(256, "GHI".to_owned())).await; + test_dialogues!(storage, None, None, None); - // 1 - ABC, 11 - DEF, 256 - GHI + Arc::clone(&storage).update_dialogue(1, "ABC".to_owned()).await.unwrap(); + Arc::clone(&storage).update_dialogue(11, "DEF".to_owned()).await.unwrap(); + Arc::clone(&storage).update_dialogue(256, "GHI".to_owned()).await.unwrap(); - check_dialogue("ABC", Arc::clone(&storage).update_dialogue(1, "JKL".to_owned())).await; - check_dialogue("GHI", Arc::clone(&storage).update_dialogue(256, "MNO".to_owned())).await; + test_dialogues!( + storage, + Some("ABC".to_owned()), + Some("DEF".to_owned()), + Some("GHI".to_owned()) + ); - // 1 - GKL, 11 - DEF, 256 - MNO + Arc::clone(&storage).remove_dialogue(1).await.unwrap(); + Arc::clone(&storage).remove_dialogue(11).await.unwrap(); + Arc::clone(&storage).remove_dialogue(256).await.unwrap(); - check_dialogue("JKL", Arc::clone(&storage).remove_dialogue(1)).await; - check_dialogue("DEF", Arc::clone(&storage).remove_dialogue(11)).await; - check_dialogue("MNO", Arc::clone(&storage).remove_dialogue(256)).await; -} - -async fn check_dialogue( - expected: impl Into>, - actual: impl Future, E>>, -) where - E: Debug, -{ - assert_eq!(expected.into().map(ToOwned::to_owned), actual.await.unwrap()) + test_dialogues!(storage, None, None, None); + + // Check that a try to remove a non-existing dialogue results in an error. + assert!(matches!( + Arc::clone(&storage).remove_dialogue(1).await.unwrap_err(), + SqliteStorageError::DialogueNotFound + )); }