diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 00000000..d6e070ff --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,12 @@ +[alias] +# We pass "--cfg docsrs" when building docs to turn on nightly-only rustdoc features like +# `This is supported on feature="..." only.` +# +# "--cfg dep_docsrs" is used for the same reason, but for `teloxide-core`. +docs = """ +doc + --all-features + --config build.rustflags=["--cfg=dep_docsrs"] + --config build.rustdocflags=["--cfg=docsrs","-Znormalize-docs"] + -Zrustdoc-scrape-examples=examples +""" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e990ad6a..c2ccac7e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,47 +7,64 @@ on: name: Continuous integration env: - RUSTFLAGS: "--cfg CI_REDIS" + RUSTFLAGS: "--cfg CI_REDIS -Dwarnings" + RUSTDOCFLAGS: -Dwarnings + RUST_BACKTRACE: short + + CARGO_INCREMENTAL: 0 + CARGO_NET_RETRY: 10 + RUSTUP_MAX_RETRIES: 10 + + rust_nightly: nightly-2022-07-01 + # When updating this, also update: + # - README.md + # - src/lib.rs + # - down below in a matrix + rust_msrv: 1.58.0 jobs: - style: + # Depends on all action that are required for a "successful" CI run. + ci-pass: + name: CI succeeded + runs-on: ubuntu-latest + + needs: + - fmt + - test + - check-examples + - clippy + - doc + + steps: + - run: exit 0 + + fmt: + name: fmt runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - uses: actions-rs/toolchain@v1 + - name: Checkout + uses: actions/checkout@v3 + + - name: Install Rust ${{ env.rust_nightly }} + uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: nightly-2022-05-06 + toolchain: ${{ env.rust_nightly }} override: true components: rustfmt - - name: fmt + - name: Cache Dependencies + uses: Swatinem/rust-cache@v1 + + - name: Check formatting uses: actions-rs/cargo@v1 with: command: fmt - args: --all -- --check - - clippy: - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 - - - uses: actions-rs/toolchain@v1 - with: - profile: minimal - toolchain: nightly-2022-05-06 - override: true - components: clippy - - - name: clippy - uses: actions-rs/cargo@v1 - with: - command: clippy - args: --all-targets --all-features -- -D warnings + args: --all -- --check test: + name: Test runs-on: ubuntu-latest strategy: matrix: @@ -65,48 +82,120 @@ jobs: toolchain: beta features: "--features full" - rust: nightly - toolchain: nightly-2022-05-06 + toolchain: nightly-2022-07-01 features: "--all-features" - rust: msrv - toolchain: "1.58.0" + toolchain: 1.58.0 features: "--features full" - steps: - - uses: actions/checkout@v2 - - - uses: actions-rs/toolchain@v1 + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Install Rust ${{ matrix.toolchain }} + uses: actions-rs/toolchain@v1 with: profile: minimal toolchain: ${{ matrix.toolchain }} override: true - - - name: build + + - name: Cache Dependencies + uses: Swatinem/rust-cache@v1 + + # NB. Don't test (build) examples so we can use non-msrv features in them (--tests/--doc) + - name: Compile uses: actions-rs/cargo@v1 with: - command: build - args: --verbose ${{ matrix.features }} - + command: test + args: --tests --no-run --verbose ${{ matrix.features }} + - name: Setup redis run: | sudo apt install redis-server redis-server --port 7777 > /dev/null & redis-server --port 7778 > /dev/null & redis-server --port 7779 > /dev/null & - - - name: test + + - name: Test unit & integration tests uses: actions-rs/cargo@v1 with: command: test - args: --verbose ${{ matrix.features }} + args: --tests --verbose ${{ matrix.features }} + + - name: Test documentation tests + if: ${{ matrix.rust != 'msrv' }} + uses: actions-rs/cargo@v1 + with: + command: test + args: --doc --verbose ${{ matrix.features }} - build-example: + check-examples: runs-on: ubuntu-latest + steps: - - uses: actions/checkout@v2 - - uses: actions-rs/toolchain@v1 + - name: Checkout + uses: actions/checkout@v3 + + - name: Install Rust stable + uses: actions-rs/toolchain@v1 with: profile: minimal toolchain: stable override: true - - name: Check the examples - run: cargo check --examples --features="full" + + - name: Cache Dependencies + uses: Swatinem/rust-cache@v1 + + - name: Check examples + uses: actions-rs/cargo@v1 + with: + command: check + args: --examples --features full + + clippy: + name: Run linter + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Install Rust ${{ env.rust_nightly }} + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: ${{ env.rust_nightly }} + override: true + components: clippy + + - name: Cache Dependencies + uses: Swatinem/rust-cache@v1 + + - name: clippy + uses: actions-rs/cargo@v1 + with: + command: clippy + args: --all-targets --all-features + + doc: + name: check docs + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Install Rust ${{ env.rust_nightly }} + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: ${{ env.rust_nightly }} + override: true + + - name: Cache Dependencies + uses: Swatinem/rust-cache@v1 + + - name: rustdoc + uses: actions-rs/cargo@v1 + with: + command: docs # from .cargo/config.toml diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e5fa740..2198111f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,32 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## unreleased +## 0.10.0 - 2022-07-21 + +### Added + + - Security checks based on `secret_token` param of `set_webhook` to built-in webhooks. + - `dispatching::update_listeners::{PollingBuilder, Polling, PollingStream}`. + - `DispatcherBuilder::enable_ctrlc_handler` method. + +### Fixed + + - `Dispatcher` no longer "leaks" memory for every inactive user ([PR 657](https://github.com/teloxide/teloxide/pull/657)). + - Allow specifying a path to a custom command parser in `parse_with` ([issue 668](https://github.com/teloxide/teloxide/issues/668)). + +### Changed + + - Add the `Key: Clone` requirement for `impl Dispatcher` [**BC**]. + - `dispatching::update_listeners::{polling_default, polling}` now return a named, `Polling<_>` type. + - Update teloxide-core to v0.7.0 with Bot API 6.1 support, see [its changelog][core07c] for more information [**BC**]. + +[core07c]: https://github.com/teloxide/teloxide-core/blob/master/CHANGELOG.md#070---2022-07-19 + +### Deprecated + +- The `dispatching::update_listeners::polling` function. +- `Dispatcher::setup_ctrlc_handler` method. + ## 0.9.2 - 2022-06-07 ### Fixed diff --git a/Cargo.toml b/Cargo.toml index 2fc06eb3..df04f34e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "teloxide" -version = "0.9.2" +version = "0.10.0" edition = "2021" description = "An elegant Telegram bots framework for Rust" repository = "https://github.com/teloxide/teloxide" @@ -14,7 +14,8 @@ exclude = ["media"] [features] default = ["native-tls", "ctrlc_handler", "teloxide-core/default", "auto-send"] -webhooks-axum = ["axum", "tower", "tower-http"] +webhooks = ["rand"] +webhooks-axum = ["webhooks", "axum", "tower", "tower-http"] sqlite-storage = ["sqlx"] redis-storage = ["redis"] @@ -56,16 +57,21 @@ full = [ ] [dependencies] -teloxide-core = { version = "0.6.0", default-features = false } -teloxide-macros = { version = "0.6.2", optional = true } +teloxide-core = { version = "0.7.0", default-features = false } +teloxide-macros = { version = "0.6.3", optional = true } serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } -dptree = "0.2.1" +dptree = "0.3.0" + +# These lines are used only for development. +# teloxide-core = { git = "https://github.com/teloxide/teloxide-core", rev = "b13393d", default-features = false } +# teloxide-macros = { git = "https://github.com/teloxide/teloxide-macros", rev = "44d91c5", optional = true } +# dptree = { git = "https://github.com/teloxide/dptree", rev = "df578e4" } tokio = { version = "1.8", features = ["fs"] } -tokio-util = "0.6" +tokio-util = "0.7" tokio-stream = "0.1.8" url = "2.2.2" @@ -80,27 +86,26 @@ pin-project = "1.0" serde_with_macros = "1.4" aquamarine = "0.1.11" -sqlx = { version = "0.5", optional = true, default-features = false, features = [ +sqlx = { version = "0.6", optional = true, default-features = false, features = [ "runtime-tokio-native-tls", "macros", "sqlite", ] } -redis = { version = "0.20", features = ["tokio-comp"], optional = true } +redis = { version = "0.21", features = ["tokio-comp"], optional = true } serde_cbor = { version = "0.11", optional = true } bincode = { version = "1.3", optional = true } -axum = { version = "0.4.8", optional = true } +axum = { version = "0.5.13", optional = true } tower = { version = "0.4.12", optional = true } -tower-http = { version = "0.2.5", features = ["trace"], optional = true } +tower-http = { version = "0.3.4", features = ["trace"], optional = true } +rand = { version = "0.8.5", optional = true } [dev-dependencies] rand = "0.8.3" pretty_env_logger = "0.4.0" -once_cell = "1.9.0" serde = "1" serde_json = "1" tokio = { version = "1.8", features = ["fs", "rt-multi-thread", "macros"] } -warp = "0.3.0" -reqwest = "0.10.4" +reqwest = "0.11.11" chrono = "0.4" tokio-stream = "0.1" @@ -153,6 +158,10 @@ required-features = ["macros"] name = "ngrok_ping_pong" required-features = ["webhooks-axum"] +[[example]] +name = "heroku_ping_pong" +required-features = ["webhooks-axum"] + [[example]] name = "purchase" required-features = ["macros"] diff --git a/MIGRATION_GUIDE.md b/MIGRATION_GUIDE.md index 315cd9f1..78003f48 100644 --- a/MIGRATION_GUIDE.md +++ b/MIGRATION_GUIDE.md @@ -1,6 +1,47 @@ This document describes breaking changes of `teloxide` crate, as well as the ways to update code. Note that the list of required changes is not fully exhaustive and it may lack something in rare cases. +## 0.9 -> 0.10 + +### core + +We've added some convenience functions to `InlineKeyboardButton` so it's easier to construct it. Consider using them instead of variants: +```diff +-InlineKeyboardButton::new("text", InlineKeyboardButtonKind::Url(url)) ++InlineKeyboardButton::url("text", url) +``` + +`file_size` fields are now `u32`, you may need to update your code accordingly: + +```diff +-let file_size: u64 = audio.file_size?; ++let file_size: u32 = audio.file_size; +``` + +Some places now use `FileMeta` instead of `File`, you may need to change types. + +`Sticker` and `StickerSet` now has a `kind` field instead of `is_animated` and `is_video`: + +```diff ++use teloxide::types::StickerKind::*; +-match () { ++match sticker.kind { +- _ if sticker.is_animated => /* handle animated */, ++ Animated => /* handle animated */, +- _ if sticker.is_video => /* handle video */, ++ Video => /* handle video */, +- _ => /* handle normal */, ++ Webp => /* handle normal */, +} +``` + +### teloxide + +Teloxide itself doesn't have any major API changes. +Note however that some function were deprecated: +- Instead of `dispatching::update_listeners::polling` use `polling_builder` +- Instead of `Dispatcher::setup_ctrlc_handler` use `DispatcherBuilder::enable_ctrlc_handler` + ## 0.7 -> 0.8 ### core @@ -8,7 +49,7 @@ Note that the list of required changes is not fully exhaustive and it may lack s `user.id` now uses `UserId` type, `ChatId` now represents only _chat id_, not channel username, all `chat_id` function parameters now accept `Recipient` (if they allow for channel usernames). If you used to work with chat/user ids (for example saving them to a database), you may need to change your code to account for new types. Some examples how that may look like: -```diff, +```diff -let user_id: i64 = user.id; +let UserId(user_id) = user.id; db.save(user_id, ...); diff --git a/README.md b/README.md index 0e758242..b05f48f8 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,4 @@ -> [v0.7 -> v0.8 migration guide >>](MIGRATION_GUIDE.md#07---08) - -> `teloxide-core` versions less that `0.4.5` (`teloxide` versions less than 0.7.3) have a low-severity security vulnerability, [learn more >>](https://github.com/teloxide/teloxide/discussions/574) +> [v0.9 -> v0.10 migration guide >>](MIGRATION_GUIDE.md#09---010)
@@ -15,7 +13,7 @@ - + @@ -72,7 +70,7 @@ $ rustup override set nightly 5. Run `cargo new my_bot`, enter the directory and put these lines into your `Cargo.toml`: ```toml [dependencies] -teloxide = { version = "0.9", features = ["macros", "auto-send"] } +teloxide = { version = "0.10", features = ["macros", "auto-send"] } log = "0.4" pretty_env_logger = "0.4" tokio = { version = "1.8", features = ["rt-multi-thread", "macros"] } @@ -105,9 +103,7 @@ async fn main() { ```
- - - +
### Commands @@ -175,9 +171,7 @@ async fn answer( ```
- - - +
### Dialogues management @@ -229,8 +223,8 @@ async fn main() { ), ) .dependencies(dptree::deps![InMemStorage::::new()]) + .enable_ctrlc_handler() .build() - .setup_ctrlc_handler() .dispatch() .await; } @@ -300,9 +294,7 @@ async fn receive_location( ```
- - - +
[More examples >>](examples/) @@ -327,11 +319,7 @@ A: No, only the bots API. **Q: Can I use webhooks?** -A: teloxide doesn't provide a special API for working with webhooks due to their nature with lots of subtle settings. Instead, you should setup your webhook by yourself, as shown in [`examples/ngrok_ping_pong_bot`](examples/ngrok_ping_pong.rs) and [`examples/heroku_ping_pong_bot`](examples/heroku_ping_pong.rs). - -Associated links: - - [Marvin's Marvellous Guide to All Things Webhook](https://core.telegram.org/bots/webhooks) - - [Using self-signed certificates](https://core.telegram.org/bots/self-signed) +A: You can! Teloxide has a built-in support for webhooks in `dispatching::update_listeners::webhooks` module. See how it's used in [`examples/ngrok_ping_pong_bot`](examples/ngrok_ping_pong.rs) and [`examples/heroku_ping_pong_bot`](examples/heroku_ping_pong.rs). **Q: Can I handle both callback queries and messages within a single dialogue?** @@ -341,29 +329,33 @@ A: Yes, see [`examples/purchase.rs`](examples/purchase.rs). Feel free to propose your own bot to our collection! - - [WaffleLapkin/crate_upd_bot](https://github.com/WaffleLapkin/crate_upd_bot) — A bot that notifies about crate updates. - - [mxseev/logram](https://github.com/mxseev/logram) — Utility that takes logs from anywhere and sends them to Telegram. - - [alexkonovalov/PedigreeBot](https://github.com/alexkonovalov/PedigreeBot) — A Telegram bot for building family trees. - - [Hermitter/tepe](https://github.com/Hermitter/tepe) — A CLI to command a bot to send messages and files over Telegram. - - [mattrighetti/GroupActivityBot](https://github.com/mattrighetti/group-activity-bot-rs) — Telegram bot that keeps track of user activity in groups. - - [mattrighetti/libgen-bot-rs](https://github.com/mattrighetti/libgen-bot-rs) — Telgram bot to interface with libgen - - [dracarys18/grpmr-rs](https://github.com/dracarys18/grpmr-rs) — A Telegram group manager bot with variety of extra features. - - [steadylearner/subreddit_reader](https://github.com/steadylearner/Rust-Full-Stack/tree/master/commits/teloxide/subreddit_reader) — A bot that shows the latest posts at Rust subreddit. - - [myblackbeard/basketball-betting-bot](https://github.com/myblackbeard/basketball-betting-bot) — The bot lets you bet on NBA games against your buddies. - - [ArtHome12/vzmuinebot](https://github.com/ArtHome12/vzmuinebot) — Telegram bot for food menu navigate. - - [ArtHome12/cognito_bot](https://github.com/ArtHome12/cognito_bot) — The bot is designed to anonymize messages to a group. - - [pro-vim/tg-vimhelpbot](https://github.com/pro-vim/tg-vimhelpbot) — Link `:help` for Vim in Telegram. - - [sschiz/janitor-bot](https://github.com/sschiz/janitor-bot) — A bot that removes users trying to join to a chat that is designed for comments. - - [slondr/BeerHolderBot](https://gitlab.com/slondr/BeerHolderBot) — A bot that holds your beer. - - [MustafaSalih1993/Miss-Vodka-Telegram-Bot](https://github.com/MustafaSalih1993/Miss-Vodka-Telegram-Bot) — A Telegram bot written in rust using "Teloxide" library. - - [x13a/tg-prompt](https://github.com/x13a/tg-prompt) — Telegram prompt. - - [magnickolas/remindee-bot](https://github.com/magnickolas/remindee-bot) — Telegram bot for managing reminders. - - [cyberknight777/knight-bot](https://gitlab.com/cyberknight777/knight-bot) — A Telegram bot with variety of fun features. - - [wa7sa34cx/the-black-box-bot](https://github.com/wa7sa34cx/the-black-box-bot) — This is the Black Box Telegram bot. You can hold any items in it. - - [crapstone/hsctt](https://codeberg.org/crapstones-bots/hsctt) — A Telegram bot that searches for HTTP status codes in all messages and replies with the text form. - - [alenpaul2001/AurSearchBot](https://gitlab.com/alenpaul2001/aursearchbot) — Telegram bot for searching AUR in inline mode. - - [studiedlist/EddieBot](https://gitlab.com/studiedlist/eddie-bot) — Chatting bot with several entertainment features. - + - [`raine/tgreddit`](https://github.com/raine/tgreddit) — A bot that sends the top posts of your favorite subreddits to Telegram. + - [`magnickolas/remindee-bot`](https://github.com/magnickolas/remindee-bot) — Telegram bot for managing reminders. + - [`WaffleLapkin/crate_upd_bot`](https://github.com/WaffleLapkin/crate_upd_bot) — A bot that notifies about crate updates. + - [`mattrighetti/GroupActivityBot`](https://github.com/mattrighetti/group-activity-bot-rs) — Telegram bot that keeps track of user activity in groups. + - [`alenpaul2001/AurSearchBot`](https://gitlab.com/alenpaul2001/aursearchbot) — Telegram bot for searching in Arch User Repository (AUR). + - [`ArtHome12/vzmuinebot`](https://github.com/ArtHome12/vzmuinebot) — Telegram bot for food menu navigate. + - [`studiedlist/EddieBot`](https://gitlab.com/studiedlist/eddie-bot) — Chatting bot with several entertainment features. + - [`modos189/tg_blackbox_bot`](https://gitlab.com/modos189/tg_blackbox_bot) — Anonymous feedback for your Telegram project. + - [`0xNima/spacecraft`](https://github.com/0xNima/spacecraft) — Yet another telegram bot to downloading Twitter spaces. + - [`0xNima/Twideo`](https://github.com/0xNima/Twideo) — Simple Telegram Bot for downloading videos from Twitter via their links. + - [`mattrighetti/libgen-bot-rs`](https://github.com/mattrighetti/libgen-bot-rs) — Telgram bot to interface with libgen. + - [`zamazan4ik/npaperbot-telegram`](https://github.com/zamazan4ik/npaperbot-telegram) — Telegram bot for searching via C++ proposals. + +
+Show bots using teloxide older than v0.6.0 + + - [`mxseev/logram`](https://github.com/mxseev/logram) — Utility that takes logs from anywhere and sends them to Telegram. + - [`alexkonovalov/PedigreeBot`](https://github.com/alexkonovalov/PedigreeBot) — A Telegram bot for building family trees. + - [`Hermitter/tepe`](https://github.com/Hermitter/tepe) — A CLI to command a bot to send messages and files over Telegram. + - [`myblackbeard/basketball-betting-bot`](https://github.com/myblackbeard/basketball-betting-bot) — The bot lets you bet on NBA games against your buddies. + - [`dracarys18/grpmr-rs`](https://github.com/dracarys18/grpmr-rs) — Modular Telegram Group Manager Bot written in Rust. + - [`ArtHome12/vzmuinebot`](https://github.com/ArtHome12/vzmuinebot) — Telegram bot for food menu navigate. + - [`ArtHome12/cognito_bot`](https://github.com/ArtHome12/cognito_bot) — The bot is designed to anonymize messages to a group. + - [`crapstone/hsctt`](https://codeberg.org/crapstones-bots/hsctt) — A bot that converts HTTP status codes into text. + +
+ ## Contributing See [`CONRIBUTING.md`](CONTRIBUTING.md). diff --git a/examples/buttons.rs b/examples/buttons.rs index 5b16fe35..b510e24a 100644 --- a/examples/buttons.rs +++ b/examples/buttons.rs @@ -30,7 +30,7 @@ async fn main() -> Result<(), Box> { .branch(Update::filter_callback_query().endpoint(callback_handler)) .branch(Update::filter_inline_query().endpoint(inline_query_handler)); - Dispatcher::builder(bot, handler).build().setup_ctrlc_handler().dispatch().await; + Dispatcher::builder(bot, handler).enable_ctrlc_handler().build().dispatch().await; Ok(()) } diff --git a/examples/db_remember.rs b/examples/db_remember.rs index dfac7bb0..1aed2808 100644 --- a/examples/db_remember.rs +++ b/examples/db_remember.rs @@ -14,18 +14,13 @@ type MyDialogue = Dialogue>; type MyStorage = std::sync::Arc>; type HandlerResult = Result<(), Box>; -#[derive(Clone, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Default, serde::Serialize, serde::Deserialize)] pub enum State { + #[default] Start, GotNumber(i32), } -impl Default for State { - fn default() -> Self { - Self::Start - } -} - #[derive(Clone, BotCommands)] #[command(rename = "lowercase", description = "These commands are supported:")] pub enum Command { @@ -59,8 +54,8 @@ async fn main() { Dispatcher::builder(bot, handler) .dependencies(dptree::deps![storage]) + .enable_ctrlc_handler() .build() - .setup_ctrlc_handler() .dispatch() .await; } diff --git a/examples/dialogue.rs b/examples/dialogue.rs index 3156b924..8e30219d 100644 --- a/examples/dialogue.rs +++ b/examples/dialogue.rs @@ -18,18 +18,18 @@ use teloxide::{dispatching::dialogue::InMemStorage, prelude::*}; type MyDialogue = Dialogue>; type HandlerResult = Result<(), Box>; -#[derive(Clone)] +#[derive(Clone, Default)] pub enum State { + #[default] Start, ReceiveFullName, - ReceiveAge { full_name: String }, - ReceiveLocation { full_name: String, age: u8 }, -} - -impl Default for State { - fn default() -> Self { - Self::Start - } + ReceiveAge { + full_name: String, + }, + ReceiveLocation { + full_name: String, + age: u8, + }, } #[tokio::main] @@ -51,8 +51,8 @@ async fn main() { ), ) .dependencies(dptree::deps![InMemStorage::::new()]) + .enable_ctrlc_handler() .build() - .setup_ctrlc_handler() .dispatch() .await; } diff --git a/examples/dispatching_features.rs b/examples/dispatching_features.rs index 91ef0808..dcaf4fcc 100644 --- a/examples/dispatching_features.rs +++ b/examples/dispatching_features.rs @@ -87,8 +87,8 @@ async fn main() { .error_handler(LoggingErrorHandler::with_custom_text( "An error has occurred in the dispatcher", )) + .enable_ctrlc_handler() .build() - .setup_ctrlc_handler() .dispatch() .await; } diff --git a/examples/heroku_ping_pong.rs b/examples/heroku_ping_pong.rs index 6374ed78..8238dd9a 100644 --- a/examples/heroku_ping_pong.rs +++ b/examples/heroku_ping_pong.rs @@ -16,25 +16,12 @@ // heroku buildpacks:set emk/rust // ``` // -// [1] https://github.com/emk/heroku-buildpack-rust +// [1]: https://github.com/emk/heroku-buildpack-rust -// TODO: use built-in webhook support +use std::env; -use teloxide::{ - dispatching::{ - stop_token::AsyncStopToken, - update_listeners::{self, StatefulListener}, - }, - prelude::*, - types::Update, -}; - -use std::{convert::Infallible, env, net::SocketAddr}; -use tokio::sync::mpsc; -use tokio_stream::wrappers::UnboundedReceiverStream; -use warp::Filter; - -use reqwest::{StatusCode, Url}; +use teloxide::{dispatching::update_listeners::webhooks, prelude::*}; +use url::Url; #[tokio::main] async fn main() { @@ -42,66 +29,31 @@ async fn main() { log::info!("Starting Heroku ping-pong bot..."); let bot = Bot::from_env().auto_send(); + let token = bot.inner().token(); + + // Heroku auto defines a port value + let port: u16 = env::var("PORT") + .expect("PORT env variable is not set") + .parse() + .expect("PORT env variable value is not an integer"); + + let addr = ([0, 0, 0, 0], port).into(); + + // Heroku host example: "heroku-ping-pong-bot.herokuapp.com" + let host = env::var("HOST").expect("HOST env variable is not set"); + let url = Url::parse(&format!("https://{host}/webhooks/{token}")).unwrap(); + + let listener = webhooks::axum(bot.clone(), webhooks::Options::new(addr, url)) + .await + .expect("Couldn't setup webhook"); teloxide::repl_with_listener( - bot.clone(), + bot, |msg: Message, bot: AutoSend| async move { bot.send_message(msg.chat.id, "pong").await?; respond(()) }, - webhook(bot).await, + listener, ) .await; } - -async fn handle_rejection(error: warp::Rejection) -> Result { - log::error!("Cannot process the request due to: {:?}", error); - Ok(StatusCode::INTERNAL_SERVER_ERROR) -} - -pub async fn webhook(bot: AutoSend) -> impl update_listeners::UpdateListener { - // Heroku auto defines a port value - let teloxide_token = env::var("TELOXIDE_TOKEN").expect("TELOXIDE_TOKEN env variable missing"); - let port: u16 = env::var("PORT") - .expect("PORT env variable missing") - .parse() - .expect("PORT value to be integer"); - // Heroku host example .: "heroku-ping-pong-bot.herokuapp.com" - let host = env::var("HOST").expect("have HOST env variable"); - let path = format!("bot{teloxide_token}"); - let url = Url::parse(&format!("https://{host}/{path}")).unwrap(); - - bot.set_webhook(url).await.expect("Cannot setup a webhook"); - - let (tx, rx) = mpsc::unbounded_channel(); - - let server = warp::post() - .and(warp::path(path)) - .and(warp::body::json()) - .map(move |update: Update| { - tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook"); - - StatusCode::OK - }) - .recover(handle_rejection); - - let (stop_token, stop_flag) = AsyncStopToken::new_pair(); - - let addr = format!("0.0.0.0:{port}").parse::().unwrap(); - let server = warp::serve(server); - let (_addr, fut) = server.bind_with_graceful_shutdown(addr, stop_flag); - - // You might want to use serve.key_path/serve.cert_path methods here to - // setup a self-signed TLS certificate. - - tokio::spawn(fut); - let stream = UnboundedReceiverStream::new(rx); - - fn streamf(state: &mut (S, T)) -> &mut S { - &mut state.0 - } - - StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| { - state.1.clone() - }) -} diff --git a/examples/inline.rs b/examples/inline.rs index a85de2c2..837fa30d 100644 --- a/examples/inline.rs +++ b/examples/inline.rs @@ -60,5 +60,5 @@ async fn main() { }, )); - Dispatcher::builder(bot, handler).build().setup_ctrlc_handler().dispatch().await; + Dispatcher::builder(bot, handler).enable_ctrlc_handler().build().dispatch().await; } diff --git a/examples/purchase.rs b/examples/purchase.rs index e8333380..0eac1ad3 100644 --- a/examples/purchase.rs +++ b/examples/purchase.rs @@ -13,10 +13,7 @@ // ``` use teloxide::{ - dispatching::{ - dialogue::{self, InMemStorage}, - UpdateHandler, - }, + dispatching::{dialogue, dialogue::InMemStorage, UpdateHandler}, prelude::*, types::{InlineKeyboardButton, InlineKeyboardMarkup}, utils::command::BotCommands, @@ -25,17 +22,14 @@ use teloxide::{ type MyDialogue = Dialogue>; type HandlerResult = Result<(), Box>; -#[derive(Clone)] +#[derive(Clone, Default)] pub enum State { + #[default] Start, ReceiveFullName, - ReceiveProductChoice { full_name: String }, -} - -impl Default for State { - fn default() -> Self { - Self::Start - } + ReceiveProductChoice { + full_name: String, + }, } #[derive(BotCommands, Clone)] @@ -58,29 +52,30 @@ async fn main() { Dispatcher::builder(bot, schema()) .dependencies(dptree::deps![InMemStorage::::new()]) + .enable_ctrlc_handler() .build() - .setup_ctrlc_handler() .dispatch() .await; } fn schema() -> UpdateHandler> { + use dptree::case; + let command_handler = teloxide::filter_command::() .branch( - dptree::case![State::Start] - .branch(dptree::case![Command::Help].endpoint(help)) - .branch(dptree::case![Command::Start].endpoint(start)), + case![State::Start] + .branch(case![Command::Help].endpoint(help)) + .branch(case![Command::Start].endpoint(start)), ) - .branch(dptree::case![Command::Cancel].endpoint(cancel)); + .branch(case![Command::Cancel].endpoint(cancel)); let message_handler = Update::filter_message() .branch(command_handler) - .branch(dptree::case![State::ReceiveFullName].endpoint(receive_full_name)) + .branch(case![State::ReceiveFullName].endpoint(receive_full_name)) .branch(dptree::endpoint(invalid_state)); - let callback_query_handler = Update::filter_callback_query().chain( - dptree::case![State::ReceiveProductChoice { full_name }] - .endpoint(receive_product_selection), + let callback_query_handler = Update::filter_callback_query().branch( + case![State::ReceiveProductChoice { full_name }].endpoint(receive_product_selection), ); dialogue::enter::, State, _>() diff --git a/examples/shared_state.rs b/examples/shared_state.rs index 33f5d4b6..21a0fcc0 100644 --- a/examples/shared_state.rs +++ b/examples/shared_state.rs @@ -1,27 +1,34 @@ // This bot answers how many messages it received in total on every message. -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; -use once_cell::sync::Lazy; use teloxide::prelude::*; -static MESSAGES_TOTAL: Lazy = Lazy::new(AtomicU64::default); - #[tokio::main] async fn main() { pretty_env_logger::init(); log::info!("Starting shared state bot..."); let bot = Bot::from_env().auto_send(); + let messages_total = Arc::new(AtomicU64::new(0)); - let handler = Update::filter_message().branch(dptree::endpoint( - |msg: Message, bot: AutoSend| async move { - let previous = MESSAGES_TOTAL.fetch_add(1, Ordering::Relaxed); + let handler = Update::filter_message().endpoint( + |msg: Message, bot: AutoSend, messages_total: Arc| async move { + let previous = messages_total.fetch_add(1, Ordering::Relaxed); bot.send_message(msg.chat.id, format!("I received {previous} messages in total.")) .await?; respond(()) }, - )); + ); - Dispatcher::builder(bot, handler).build().setup_ctrlc_handler().dispatch().await; + Dispatcher::builder(bot, handler) + // Pass the shared state to the handler as a dependency. + .dependencies(dptree::deps![messages_total]) + .enable_ctrlc_handler() + .build() + .dispatch() + .await; } diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 43d17c19..ea79a434 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,4 @@ [toolchain] -channel = "nightly-2022-05-06" +channel = "nightly-2022-07-01" components = ["rustfmt", "clippy"] profile = "minimal" diff --git a/src/dispatching.rs b/src/dispatching.rs index ef958901..e0440705 100644 --- a/src/dispatching.rs +++ b/src/dispatching.rs @@ -1,99 +1,198 @@ //! An update dispatching model based on [`dptree`]. //! -//! In teloxide, updates are dispatched by a pipleine. The central type is -//! [`dptree::Handler`] -- it represents a handler of an update; since the API -//! is highly declarative, you can combine handlers with each other via such -//! methods as [`dptree::Handler::chain`] and [`dptree::Handler::branch`]. The -//! former method pipes one handler to another one, whilst the latter creates a -//! new node, as communicated by the name. For more information, please refer to -//! the documentation of [`dptree`]. +//! In teloxide, update dispatching is declarative: it takes the form of a +//! [chain of responsibility] pattern enriched with a number of combinator +//! functions, which together form an instance of the [`dptree::Handler`] type. //! -//! The pattern itself is called [chain of responsibility], a well-known design -//! technique across OOP developers. But unlike typical object-oriented design, -//! we employ declarative FP-style functions like [`dptree::filter`], -//! [`dptree::filter_map`], and [`dptree::endpoint`]; these functions create -//! special forms of [`dptree::Handler`]; for more information, please refer to -//! their respective documentation. Each of these higher-order functions accept -//! a closure that is made into a handler -- this closure can take any -//! additional parameters, which must be supplied while creating [`Dispatcher`] -//! (see [`DispatcherBuilder::dependencies`]). -//! -//! The [`Dispatcher`] type puts all these things together: it only provides -//! [`Dispatcher::dispatch`] and a handful of other methods. Once you call -//! `.dispatch()`, it will retrieve updates from the Telegram server and pass -//! them to your handler, which is a parameter of [`Dispatcher::builder`]. -//! -//! Let us look at a simple example: -//! -//! -//! ([Full](https://github.com/teloxide/teloxide/blob/master/examples/shared_state.rs)) +//! Take [`examples/purchase.rs`] as an example of dispatching logic. First, we +//! define a type named `State` to represent the current state of a dialogue: //! //! ```no_run -//! use std::sync::atomic::{AtomicU64, Ordering}; -//! -//! use once_cell::sync::Lazy; -//! use teloxide::prelude::*; -//! -//! static MESSAGES_TOTAL: Lazy = Lazy::new(AtomicU64::default); -//! -//! # #[tokio::main] -//! # async fn main() { -//! pretty_env_logger::init(); -//! log::info!("Starting shared state bot..."); -//! -//! let bot = Bot::from_env().auto_send(); -//! -//! let handler = Update::filter_message().branch(dptree::endpoint( -//! |msg: Message, bot: AutoSend| async move { -//! let previous = MESSAGES_TOTAL.fetch_add(1, Ordering::Relaxed); -//! bot.send_message(msg.chat.id, format!("I received {} messages in total.", previous)) -//! .await?; -//! respond(()) +//! #[derive(Clone, Default)] +//! pub enum State { +//! #[default] +//! Start, +//! ReceiveFullName, +//! ReceiveProductChoice { +//! full_name: String, //! }, -//! )); -//! -//! Dispatcher::builder(bot, handler).build().setup_ctrlc_handler().dispatch().await; -//! # } +//! } //! ``` //! -//! 1. First, we create the bot: `let bot = Bot::from_env().auto_send()`. -//! 2. Then we construct an update handler. While it is possible to handle all -//! kinds of [`crate::types::Update`], here we are only interested in -//! [`crate::types::Message`]: [`UpdateFilterExt::filter_message`] create a -//! handler object which filters all messages out of a generic update. -//! 3. By doing `.branch(dptree::endpoint(...))`, we set up a custom handling -//! closure that receives `msg: Message` and `bot: AutoSend`. There are -//! called dependencies: `msg` is supplied by -//! [`UpdateFilterExt::filter_message`], while `bot` is supplied by -//! [`Dispatcher`]. +//! Then, we define a type `Command` to represent user commands such as +//! `/start` or `/help`: //! -//! That being said, if we receive a message, the dispatcher will call our -//! handler, but if we receive something other than a message (e.g., a channel -//! post), you will see an unhandled update notice in your terminal. +//! ```no_run +//! # use teloxide::utils::command::BotCommands; +//! #[derive(BotCommands, Clone)] +//! #[command(rename = "lowercase", description = "These commands are supported:")] +//! enum Command { +//! #[command(description = "display this text.")] +//! Help, +//! #[command(description = "start the purchase procedure.")] +//! Start, +//! #[command(description = "cancel the purchase procedure.")] +//! Cancel, +//! } +//! ``` //! -//! This is a very limited example of update pipelining facilities. In more -//! involved scenarios, there are multiple branches and chains; if one element -//! of a chain fails to handle an update, the update will be passed forwards; if -//! no handler succeeds at handling the update, [`Dispatcher`] will invoke a -//! default handler set up via [`DispatcherBuilder::default_handler`]. +//! Now the key question: how to elegantly dispatch on different combinations of +//! `State`, `Command`, and Telegram updates? -- i.e., we may want to execute +//! specific endpoints only in response to specific user commands and while we +//! are in a given dialogue state (and possibly under other circumstances!). The +//! solution is to use [`dptree`]: //! -//! Update pipelining provides several advantages over the typical `match -//! (update.kind) { ... }` approach: +//! ```no_run +//! # // That's a lot of context needed to compile this, oof +//! # use teloxide::dispatching::{UpdateHandler, UpdateFilterExt, dialogue, dialogue::InMemStorage}; +//! # use teloxide::utils::command::BotCommands; +//! # use teloxide::types::Update; +//! # #[derive(Clone, Default)] pub enum State { #[default] Start, ReceiveFullName, ReceiveProductChoice { full_name: String } } +//! # #[derive(BotCommands, Clone)] enum Command { Help, Start, Cancel } +//! # type HandlerResult = Result<(), Box>; +//! # async fn help() -> HandlerResult { Ok(()) } +//! # async fn start() -> HandlerResult { Ok(()) } +//! # async fn cancel() -> HandlerResult { Ok(()) } +//! # async fn receive_full_name() -> HandlerResult { Ok(()) } +//! # async fn invalid_state() -> HandlerResult { Ok(()) } +//! # async fn receive_product_selection() -> HandlerResult { Ok(()) } +//! # +//! fn schema() -> UpdateHandler> { +//! use dptree::case; //! -//! 1. It supports _extension_: e.g., you -//! can define extension filters or some other handlers and then combine them in -//! a single place, thus facilitating loose coupling. -//! 2. Pipelining exhibits a natural syntax for expressing message processing. -//! 3. Lastly, it provides a primitive form of [dependency injection (DI)], -//! which allows you to deal with such objects as a bot and various update types -//! easily. +//! let command_handler = teloxide::filter_command::() +//! .branch( +//! case![State::Start] +//! .branch(case![Command::Help].endpoint(help)) +//! .branch(case![Command::Start].endpoint(start)), +//! ) +//! .branch(case![Command::Cancel].endpoint(cancel)); //! -//! For a more involved example, see [`examples/dispatching_features.rs`](https://github.com/teloxide/teloxide/blob/master/examples/dispatching_features.rs). +//! let message_handler = Update::filter_message() +//! .branch(command_handler) +//! .branch(case![State::ReceiveFullName].endpoint(receive_full_name)) +//! .branch(dptree::endpoint(invalid_state)); //! -//! TODO: explain a more involved example with multiple branches. +//! let callback_query_handler = Update::filter_callback_query().branch( +//! case![State::ReceiveProductChoice { full_name }].endpoint(receive_product_selection), +//! ); //! +//! dialogue::enter::, State, _>() +//! .branch(message_handler) +//! .branch(callback_query_handler) +//! } +//! ``` +//! +//! The overall logic should be clear. Throughout the above example, we use +//! several techniques: +//! +//! - **Branching:** `a.branch(b)` roughly means "try to handle an update with +//! `a`, then, if it +//! neglects the update, try `b`". +//! - **Pattern matching:** We also use the [`dptree::case!`] macro +//! extensively, which acts as a filter on an enumeration: if it is of a +//! certain variant, it passes the variant's payload down the handler chain; +//! otherwise, it neglects an update. +//! - **Endpoints:** To specify the final function to handle an update, we use +//! [`dptree::Handler::endpoint`]. +//! +//! Notice the clear and uniform code structure: regardless of the dispatch +//! criteria, we use the same program constructions. In future, you may want to +//! introduce your application-specific filters or data structures to match upon +//! -- no problem, reuse [`dptree::Handler::filter`], [`dptree::case!`], and +//! other combinators in the same way! +//! +//! Finally, we define our endpoints like this: +//! +//! ```no_run +//! # use teloxide::{Bot, adaptors::AutoSend}; +//! # use teloxide::types::{Message, CallbackQuery}; +//! # use teloxide::dispatching::dialogue::{InMemStorage, Dialogue}; +//! # enum State{} +//! # +//! type MyDialogue = Dialogue>; +//! type HandlerResult = Result<(), Box>; +//! +//! async fn start(bot: AutoSend, msg: Message, dialogue: MyDialogue) -> HandlerResult { +//! todo!() +//! } +//! +//! async fn help(bot: AutoSend, msg: Message) -> HandlerResult { +//! todo!() +//! } +//! +//! async fn cancel(bot: AutoSend, msg: Message, dialogue: MyDialogue) -> HandlerResult { +//! todo!() +//! } +//! +//! async fn invalid_state(bot: AutoSend, msg: Message) -> HandlerResult { +//! todo!() +//! } +//! +//! async fn receive_full_name( +//! bot: AutoSend, +//! msg: Message, +//! dialogue: MyDialogue, +//! ) -> HandlerResult { +//! todo!() +//! } +//! +//! async fn receive_product_selection( +//! bot: AutoSend, +//! q: CallbackQuery, +//! dialogue: MyDialogue, +//! full_name: String, +//! ) -> HandlerResult { +//! todo!() +//! } +//! ``` +//! +//! Each parameter is supplied as a dependency by teloxide. In particular: +//! - `bot: AutoSend` comes from the dispatcher (see below); +//! - `msg: Message` comes from [`Update::filter_message`]; +//! - `q: CallbackQuery` comes from [`Update::filter_callback_query`]; +//! - `dialogue: MyDialogue` comes from [`dialogue::enter`]; +//! - `full_name: String` comes from `dptree::case![State::ReceiveProductChoice +//! { full_name }]`. +//! +//! Inside `main`, we plug the schema into [`Dispatcher`] like this: +//! +//! ```no_run +//! # use teloxide::Bot; +//! # use teloxide::requests::RequesterExt; +//! # use teloxide::dispatching::{Dispatcher, dialogue::InMemStorage}; +//! # enum State {} +//! # fn schema() -> teloxide::dispatching::UpdateHandler> { teloxide::dptree::entry() } +//! #[tokio::main] +//! async fn main() { +//! let bot = Bot::from_env().auto_send(); +//! +//! Dispatcher::builder(bot, schema()) +//! .dependencies(dptree::deps![InMemStorage::::new()]) +//! .enable_ctrlc_handler() +//! .build() +//! .dispatch() +//! .await; +//! } +//! ``` +//! +//! In a call to [`DispatcherBuilder::dependencies`], we specify a list of +//! additional dependencies that all handlers will receive as parameters. Here, +//! we only specify an in-memory storage of dialogues needed for +//! [`dialogue::enter`]. However, in production bots, you normally also pass a +//! database connection, configuration, and other stuff. +//! +//! All in all, [`dptree`] can be seen as an extensible alternative to pattern +//! matching, with support for [dependency injection (DI)] and a few other +//! useful features. See [`examples/dispatching_features.rs`] as a more involved +//! example. +//! +//! [`examples/purchase.rs`]: https://github.com/teloxide/teloxide/blob/master/examples/purchase.rs +//! [`Update::filter_message`]: crate::types::Update::filter_message +//! [`Update::filter_callback_query`]: crate::types::Update::filter_callback_query //! [chain of responsibility]: https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern //! [dependency injection (DI)]: https://en.wikipedia.org/wiki/Dependency_injection +//! [`examples/dispatching_features.rs`]: https://github.com/teloxide/teloxide/blob/master/examples/dispatching_features.rs #[cfg(all(feature = "ctrlc_handler"))] pub mod repls; diff --git a/src/dispatching/dialogue.rs b/src/dispatching/dialogue.rs index a6661f1e..3af1ecc0 100644 --- a/src/dispatching/dialogue.rs +++ b/src/dispatching/dialogue.rs @@ -4,17 +4,19 @@ //! wrapper over [`Storage`] and a chat ID. All it does is provides convenient //! method for manipulating the dialogue state. [`Storage`] is where all //! dialogue states are stored; it can be either [`InMemStorage`], which is a -//! simple hash map, or database wrappers such as [`SqliteStorage`]. In the -//! latter case, your dialogues are _persistent_, meaning that you can safely -//! restart your bot and all dialogues will remain in the database -- this is a -//! preferred method for production bots. +//! simple hash map from [`std::collections`], or an advanced database wrapper +//! such as [`SqliteStorage`]. In the latter case, your dialogues are +//! _persistent_, meaning that you can safely restart your bot and all ongoing +//! dialogues will remain in the database -- this is a preferred method for +//! production bots. //! //! [`examples/dialogue.rs`] clearly demonstrates the typical usage of //! dialogues. Your dialogue state can be represented as an enumeration: //! //! ```ignore -//! #[derive(Clone)] +//! #[derive(Clone, Default)] //! pub enum State { +//! #[default] //! Start, //! ReceiveFullName, //! ReceiveAge { full_name: String }, @@ -30,8 +32,8 @@ //! bot: AutoSend, //! msg: Message, //! dialogue: MyDialogue, -//! (full_name,): (String,), // Available from `State::ReceiveAge`. -//! ) -> anyhow::Result<()> { +//! full_name: String, // Available from `State::ReceiveAge`. +//! ) -> HandlerResult { //! match msg.text().map(|text| text.parse::()) { //! Some(Ok(age)) => { //! bot.send_message(msg.chat.id, "What's your location?").await?; @@ -46,11 +48,12 @@ //! } //! ``` //! -//! Variant's fields are passed to state handlers as tuples: `(full_name,): -//! (String,)`. Using [`Dialogue::update`], you can update the dialogue with a -//! new state, in our case -- `State::ReceiveLocation { full_name, age }`. To -//! exit the dialogue, just call [`Dialogue::exit`] and it will be removed from -//! the inner storage: +//! Variant's fields are passed to state handlers as single arguments like +//! `full_name: String` or tuples in case of two or more variant parameters (see +//! below). Using [`Dialogue::update`], you can update the dialogue with a new +//! state, in our case -- `State::ReceiveLocation { full_name, age }`. To exit +//! the dialogue, just call [`Dialogue::exit`] and it will be removed from the +//! underlying storage: //! //! ```ignore //! async fn receive_location( diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index f458c23f..6ff568e6 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -11,17 +11,20 @@ use crate::{ use dptree::di::{DependencyMap, DependencySupplier}; use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; -use std::{ - collections::HashMap, - fmt::Debug, - hash::Hash, - ops::{ControlFlow, Deref}, - sync::Arc, -}; use tokio::time::timeout; use tokio_stream::wrappers::ReceiverStream; -use std::future::Future; +use std::{ + collections::HashMap, + fmt::Debug, + future::Future, + hash::Hash, + ops::{ControlFlow, Deref}, + sync::{ + atomic::{AtomicBool, AtomicU32, Ordering}, + Arc, + }, +}; /// The builder for [`Dispatcher`]. pub struct DispatcherBuilder { @@ -30,6 +33,7 @@ pub struct DispatcherBuilder { handler: Arc>, default_handler: DefaultHandler, error_handler: Arc + Send + Sync>, + ctrlc_handler: bool, distribution_f: fn(&Update) -> Option, worker_queue_size: usize, } @@ -75,6 +79,14 @@ where Self { dependencies, ..self } } + /// Enables the `^C` handler that [`shutdown`]s dispatching. + /// + /// [`shutdown`]: ShutdownToken::shutdown + #[cfg(feature = "ctrlc_handler")] + pub fn enable_ctrlc_handler(self) -> Self { + Self { ctrlc_handler: true, ..self } + } + /// Specifies size of the queue for workers. /// /// By default it's 64. @@ -98,6 +110,7 @@ where handler, default_handler, error_handler, + ctrlc_handler, distribution_f: _, worker_queue_size, } = self; @@ -108,6 +121,7 @@ where handler, default_handler, error_handler, + ctrlc_handler, distribution_f: f, worker_queue_size, } @@ -124,9 +138,10 @@ where error_handler, distribution_f, worker_queue_size, + ctrlc_handler, } = self; - Dispatcher { + let dp = Dispatcher { bot, dependencies, handler, @@ -137,7 +152,20 @@ where worker_queue_size, workers: HashMap::new(), default_worker: None, + current_number_of_active_workers: Default::default(), + max_number_of_active_workers: Default::default(), + }; + + #[cfg(feature = "ctrlc_handler")] + { + if ctrlc_handler { + let mut dp = dp; + dp.setup_ctrlc_handler_inner(); + return dp; + } } + + dp } } @@ -158,6 +186,8 @@ pub struct Dispatcher { distribution_f: fn(&Update) -> Option, worker_queue_size: usize, + current_number_of_active_workers: Arc, + max_number_of_active_workers: Arc, // Tokio TX channel parts associated with chat IDs that consume updates sequentially. workers: HashMap, // The default TX part that consume updates concurrently. @@ -171,6 +201,7 @@ pub struct Dispatcher { struct Worker { tx: tokio::sync::mpsc::Sender, handle: tokio::task::JoinHandle<()>, + is_waiting: Arc, } // TODO: it is allowed to return message as response on telegram request in @@ -204,6 +235,7 @@ where Box::pin(async {}) }), error_handler: LoggingErrorHandler::new(), + ctrlc_handler: false, worker_queue_size: DEFAULT_WORKER_QUEUE_SIZE, distribution_f: default_distribution_function, } @@ -214,7 +246,7 @@ impl Dispatcher where R: Requester + Clone + Send + Sync + 'static, Err: Send + Sync + 'static, - Key: Hash + Eq, + Key: Hash + Eq + Clone, { /// Starts your bot with the default parameters. /// @@ -230,7 +262,6 @@ where /// - [`crate::types::Me`] (can be used in [`HandlerExt::filter_command`]). /// /// [`shutdown`]: ShutdownToken::shutdown - /// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler /// [`HandlerExt::filter_command`]: crate::dispatching::HandlerExt::filter_command pub async fn dispatch(&mut self) where @@ -250,7 +281,6 @@ where /// This method adds the same dependencies as [`Dispatcher::dispatch`]. /// /// [`shutdown`]: ShutdownToken::shutdown - /// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>( &'a mut self, mut update_listener: UListener, @@ -280,6 +310,8 @@ where tokio::pin!(stream); loop { + self.remove_inactive_workers_if_needed().await; + // False positive #[allow(clippy::collapsible_match)] if let Ok(upd) = timeout(shutdown_check_timeout, stream.next()).await { @@ -342,6 +374,8 @@ where handler, default_handler, error_handler, + Arc::clone(&self.current_number_of_active_workers), + Arc::clone(&self.max_number_of_active_workers), self.worker_queue_size, ) }), @@ -367,11 +401,68 @@ where } } + async fn remove_inactive_workers_if_needed(&mut self) { + let workers = self.workers.len(); + let max = self.max_number_of_active_workers.load(Ordering::Relaxed) as usize; + + if workers <= max { + return; + } + + self.remove_inactive_workers().await; + } + + #[inline(never)] // Cold function. + async fn remove_inactive_workers(&mut self) { + let handles = self + .workers + .iter() + .filter(|(_, worker)| { + worker.tx.capacity() == self.worker_queue_size + && worker.is_waiting.load(Ordering::Relaxed) + }) + .map(|(k, _)| k) + .cloned() + .collect::>() + .into_iter() + .map(|key| { + let Worker { tx, handle, .. } = self.workers.remove(&key).unwrap(); + + // Close channel, worker should stop almost immediately + // (it's been supposedly waiting on the channel) + drop(tx); + + handle + }); + + for handle in handles { + // We must wait for worker to stop anyway, even though it should stop + // immediately. This helps in case if we've checked that the worker + // is waiting in between it received the update and set the flag. + let _ = handle.await; + } + } + /// Setups the `^C` handler that [`shutdown`]s dispatching. /// /// [`shutdown`]: ShutdownToken::shutdown #[cfg(feature = "ctrlc_handler")] + #[deprecated(since = "0.10.0", note = "use `enable_ctrlc_handler` on builder instead")] pub fn setup_ctrlc_handler(&mut self) -> &mut Self { + self.setup_ctrlc_handler_inner(); + self + } + + /// Returns a shutdown token, which can later be used to shutdown + /// dispatching. + pub fn shutdown_token(&self) -> ShutdownToken { + self.state.clone() + } +} + +impl Dispatcher { + #[cfg(feature = "ctrlc_handler")] + fn setup_ctrlc_handler_inner(&mut self) { let token = self.state.clone(); tokio::spawn(async move { loop { @@ -389,14 +480,6 @@ where } } }); - - self - } - - /// Returns a shutdown token, which can later be used to shutdown - /// dispatching. - pub fn shutdown_token(&self) -> ShutdownToken { - self.state.clone() } } @@ -405,25 +488,40 @@ fn spawn_worker( handler: Arc>, default_handler: DefaultHandler, error_handler: Arc + Send + Sync>, + current_number_of_active_workers: Arc, + max_number_of_active_workers: Arc, queue_size: usize, ) -> Worker where Err: Send + Sync + 'static, { - let (tx, rx) = tokio::sync::mpsc::channel(queue_size); + let (tx, mut rx) = tokio::sync::mpsc::channel(queue_size); + let is_waiting = Arc::new(AtomicBool::new(true)); + let is_waiting_local = Arc::clone(&is_waiting); let deps = Arc::new(deps); - let handle = tokio::spawn(ReceiverStream::new(rx).for_each(move |update| { - let deps = Arc::clone(&deps); - let handler = Arc::clone(&handler); - let default_handler = Arc::clone(&default_handler); - let error_handler = Arc::clone(&error_handler); + let handle = tokio::spawn(async move { + while let Some(update) = rx.recv().await { + is_waiting_local.store(false, Ordering::Relaxed); + { + let current = current_number_of_active_workers.fetch_add(1, Ordering::Relaxed) + 1; + max_number_of_active_workers.fetch_max(current, Ordering::Relaxed); + } - handle_update(update, deps, handler, default_handler, error_handler) - })); + let deps = Arc::clone(&deps); + let handler = Arc::clone(&handler); + let default_handler = Arc::clone(&default_handler); + let error_handler = Arc::clone(&error_handler); - Worker { tx, handle } + handle_update(update, deps, handler, default_handler, error_handler).await; + + current_number_of_active_workers.fetch_sub(1, Ordering::Relaxed); + is_waiting_local.store(true, Ordering::Relaxed); + } + }); + + Worker { tx, handle, is_waiting } } fn spawn_default_worker( @@ -449,7 +547,7 @@ where handle_update(update, deps, handler, default_handler, error_handler) })); - Worker { tx, handle } + Worker { tx, handle, is_waiting: Arc::new(AtomicBool::new(true)) } } async fn handle_update( diff --git a/src/dispatching/distribution.rs b/src/dispatching/distribution.rs index 208e0018..2089b7b6 100644 --- a/src/dispatching/distribution.rs +++ b/src/dispatching/distribution.rs @@ -1,7 +1,7 @@ use teloxide_core::types::{ChatId, Update}; /// Default distribution key for dispatching. -#[derive(Debug, Hash, PartialEq, Eq)] +#[derive(Debug, Hash, PartialEq, Eq, Clone)] pub struct DefaultKey(ChatId); pub(crate) fn default_distribution_function(update: &Update) -> Option { diff --git a/src/dispatching/handler_description.rs b/src/dispatching/handler_description.rs index cf3b4e4f..3ed2f621 100644 --- a/src/dispatching/handler_description.rs +++ b/src/dispatching/handler_description.rs @@ -1,44 +1,27 @@ use std::collections::HashSet; -use dptree::{description::EventKind, HandlerDescription}; +use dptree::{ + description::{EventKind, InterestSet}, + HandlerDescription, +}; use teloxide_core::types::AllowedUpdate; /// Handler description that is used by [`Dispatcher`]. /// /// [`Dispatcher`]: crate::dispatching::Dispatcher pub struct DpHandlerDescription { - allowed: EventKind, + allowed: InterestSet, } impl DpHandlerDescription { pub(crate) fn of(allowed: AllowedUpdate) -> Self { let mut set = HashSet::with_capacity(1); - set.insert(allowed); - Self { allowed: EventKind::InterestList(set) } + set.insert(Kind(allowed)); + Self { allowed: InterestSet::new_filter(set) } } pub(crate) fn allowed_updates(&self) -> Vec { - use AllowedUpdate::*; - - match &self.allowed { - EventKind::InterestList(set) => set.iter().copied().collect(), - EventKind::Entry => panic!("No updates were allowed"), - EventKind::UserDefined => vec![ - Message, - EditedMessage, - ChannelPost, - EditedChannelPost, - InlineQuery, - ChosenInlineResult, - CallbackQuery, - ShippingQuery, - PreCheckoutQuery, - Poll, - PollAnswer, - MyChatMember, - ChatMember, - ], - } + self.allowed.observed.iter().map(|Kind(x)| x).copied().collect() } } @@ -59,3 +42,70 @@ impl HandlerDescription for DpHandlerDescription { Self { allowed: self.allowed.merge_branch(&other.allowed) } } } + +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] +struct Kind(AllowedUpdate); + +impl EventKind for Kind { + fn full_set() -> HashSet { + use AllowedUpdate::*; + + [ + Message, + EditedMessage, + ChannelPost, + EditedChannelPost, + InlineQuery, + ChosenInlineResult, + CallbackQuery, + ShippingQuery, + PreCheckoutQuery, + Poll, + PollAnswer, + MyChatMember, + ChatMember, + ] + .into_iter() + .map(Kind) + .collect() + } + + fn empty_set() -> HashSet { + HashSet::new() + } +} + +#[cfg(test)] +mod tests { + use crate::{ + dispatching::{HandlerExt, UpdateFilterExt}, + types::{AllowedUpdate::*, Update}, + utils::command::BotCommands, + }; + + use crate as teloxide; // fixup for the `BotCommands` macro + + #[derive(BotCommands, Clone)] + #[command(rename = "lowercase")] + enum Cmd { + B, + } + + // + #[test] + fn discussion_648() { + let h = + dptree::entry().branch(Update::filter_my_chat_member().endpoint(|| async {})).branch( + Update::filter_message() + .branch(dptree::entry().filter_command::().endpoint(|| async {})) + .endpoint(|| async {}), + ); + + let mut v = h.description().allowed_updates(); + + // Hash set randomizes element order, so to compare we need to sort + v.sort_by_key(|&a| a as u8); + + assert_eq!(v, [Message, MyChatMember]) + } +} diff --git a/src/dispatching/repls/commands_repl.rs b/src/dispatching/repls/commands_repl.rs index fd04964a..7f7db10f 100644 --- a/src/dispatching/repls/commands_repl.rs +++ b/src/dispatching/repls/commands_repl.rs @@ -14,7 +14,12 @@ use teloxide_core::requests::Requester; /// /// All errors from an update listener and handler will be logged. /// +/// REPLs are meant only for simple bots and rapid prototyping. If you need to +/// supply dependencies or describe more complex dispatch logic, please use +/// [`Dispatcher`]. +/// /// ## Caution +/// /// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, /// because Telegram disallow multiple requests at the same time from the same /// bot. @@ -49,7 +54,12 @@ where /// /// All errors from an update listener and handler will be logged. /// +/// REPLs are meant only for simple bots and rapid prototyping. If you need to +/// supply dependencies or describe more complex dispatch logic, please use +/// [`Dispatcher`]. +/// /// ## Caution +/// /// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, /// because Telegram disallow multiple requests at the same time from the same /// bot. @@ -86,8 +96,8 @@ pub async fn commands_repl_with_listener<'a, R, Cmd, H, L, ListenerE, E, Args>( Update::filter_message().filter_command::().chain(dptree::endpoint(handler)), ) .default_handler(ignore_update) + .enable_ctrlc_handler() .build() - .setup_ctrlc_handler() .dispatch_with_listener( listener, LoggingErrorHandler::with_custom_text("An error from the update listener"), diff --git a/src/dispatching/repls/repl.rs b/src/dispatching/repls/repl.rs index eec73f1f..cecf90ad 100644 --- a/src/dispatching/repls/repl.rs +++ b/src/dispatching/repls/repl.rs @@ -11,7 +11,12 @@ use teloxide_core::requests::Requester; /// /// All errors from an update listener and a handler will be logged. /// -/// # Caution +/// REPLs are meant only for simple bots and rapid prototyping. If you need to +/// supply dependencies or describe more complex dispatch logic, please use +/// [`Dispatcher`]. +/// +/// ## Caution +/// /// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, /// because Telegram disallow multiple requests at the same time from the same /// bot. @@ -35,7 +40,12 @@ where /// /// All errors from an update listener and handler will be logged. /// +/// REPLs are meant only for simple bots and rapid prototyping. If you need to +/// supply dependencies or describe more complex dispatch logic, please use +/// [`Dispatcher`]. +/// /// # Caution +/// /// **DO NOT** use this function together with [`Dispatcher`] and other REPLs, /// because Telegram disallow multiple requests at the same time from the same /// bot. @@ -61,8 +71,8 @@ where Dispatcher::builder(bot, Update::filter_message().chain(dptree::endpoint(handler))) .default_handler(ignore_update) + .enable_ctrlc_handler() .build() - .setup_ctrlc_handler() .dispatch_with_listener( listener, LoggingErrorHandler::with_custom_text("An error from the update listener"), diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index 4c01d174..9c4abc2f 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -27,7 +27,7 @@ /// Implementations of webhook update listeners - an alternative (to /// [`fn@polling`]) way of receiving updates from telegram. -#[cfg(any(feature = "webhooks-axum"))] +#[cfg(feature = "webhooks")] pub mod webhooks; use futures::Stream; @@ -42,8 +42,9 @@ use crate::{ mod polling; mod stateful_listener; +#[allow(deprecated)] pub use self::{ - polling::{polling, polling_default}, + polling::{polling, polling_default, Polling, PollingBuilder, PollingStream}, stateful_listener::StatefulListener, }; @@ -125,3 +126,11 @@ pub trait AsUpdateStream<'a, E> { /// [`Stream`]: AsUpdateStream::Stream fn as_stream(&'a mut self) -> Self::Stream; } + +#[inline(always)] +pub(crate) fn assert_update_listener(listener: L) -> L +where + L: UpdateListener, +{ + listener +} diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index 3987779d..118fc02e 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -1,89 +1,200 @@ -use std::{convert::TryInto, time::Duration}; - -use futures::{ - future::{ready, Either}, - stream::{self, Stream, StreamExt}, +use std::{ + convert::TryInto, + future::Future, + pin::Pin, + task::{ + self, + Poll::{self, Ready}, + }, + time::Duration, + vec, }; +use futures::{ready, stream::Stream}; + use crate::{ dispatching::{ stop_token::{AsyncStopFlag, AsyncStopToken}, - update_listeners::{stateful_listener::StatefulListener, UpdateListener}, + update_listeners::{assert_update_listener, AsUpdateStream, UpdateListener}, }, - payloads::{GetUpdates, GetUpdatesSetters as _}, requests::{HasPayload, Request, Requester}, types::{AllowedUpdate, Update}, }; -/// Returns a long polling update listener with `timeout` of 10 seconds. +/// Builder for polling update listener. /// -/// See also: [`polling`](polling). -/// -/// ## Notes -/// -/// This function will automatically delete a webhook if it was set up. -pub async fn polling_default(requester: R) -> impl UpdateListener +/// Can be created by [`Polling::builder`]. +#[non_exhaustive] +#[must_use = "`PollingBuilder` is a builder and does nothing unless used"] +pub struct PollingBuilder { + pub bot: R, + pub timeout: Option, + pub limit: Option, + pub allowed_updates: Option>, + pub drop_pending_updates: bool, +} + +impl PollingBuilder where R: Requester + Send + 'static, ::GetUpdates: Send, { - delete_webhook_if_setup(&requester).await; - polling(requester, Some(Duration::from_secs(10)), None, None) + /// A timeout in seconds for polling. + /// + /// ## Note + /// + /// `timeout` should not be bigger than http client timeout, see + /// [`default_reqwest_settings`] for default http client settings. + /// + /// [`default_reqwest_settings`]: crate::net::default_reqwest_settings + pub fn timeout(self, timeout: Duration) -> Self { + Self { timeout: Some(timeout), ..self } + } + + /// Limit the number of updates to be retrieved at once. Values between + /// 1—100 are accepted. + /// + /// ## Panics + /// + /// If `limit` is 0 or greater than 100. + #[track_caller] + pub fn limit(self, limit: u8) -> Self { + assert_ne!(limit, 0, "limit can't be 0"); + assert!(limit <= 100, "maximum limit is 100, can't set limit to `{limit}`"); + + Self { limit: Some(limit), ..self } + } + + /// A list of the types of updates you want to receive. + /// + /// ## Note + /// + /// Teloxide normally (when using [`Dispatcher`] or [`repl`]s) sets this + /// automatically via [`hint_allowed_updates`], so you rarely need to use + /// `allowed_updates` explicitly. + /// + /// [`Dispatcher`]: crate::dispatching::Dispatcher + /// [`repl`]: fn@crate::repl + /// [`hint_allowed_updates`]: crate::dispatching::update_listeners::UpdateListener::hint_allowed_updates + pub fn allowed_updates(self, allowed_updates: Vec) -> Self { + Self { allowed_updates: Some(allowed_updates), ..self } + } + + /// Drops pending updates. + pub fn drop_pending_updates(self) -> Self { + Self { drop_pending_updates: true, ..self } + } + + /// Deletes webhook if it was set up. + pub async fn delete_webhook(self) -> Self { + delete_webhook_if_setup(&self.bot).await; + + self + } + + /// Returns a long polling update listener with configuration from the + /// builder. + /// + /// See also: [`polling_default`], [`Polling`]. + pub fn build(self) -> Polling { + let Self { bot, timeout, limit, allowed_updates, drop_pending_updates } = self; + let (token, flag) = AsyncStopToken::new_pair(); + let polling = + Polling { bot, timeout, limit, allowed_updates, drop_pending_updates, flag, token }; + + assert_update_listener(polling) + } } -#[cfg_attr(doc, aquamarine::aquamarine)] -/// Returns a long polling update listener with some additional options. +/// Returns a long polling update listener with `timeout` of 10 seconds. /// -/// - `bot`: Using this bot, the returned update listener will receive updates. -/// - `timeout`: A timeout in seconds for polling. -/// - `limit`: Limits the number of updates to be retrieved at once. Values -/// between 1—100 are accepted. -/// - `allowed_updates`: A list the types of updates you want to receive. -/// -/// See [`GetUpdates`] for defaults. -/// -/// See also: [`polling_default`](polling_default). +/// See also: [`Polling::builder`]. /// /// ## Notes /// -/// - `timeout` should not be bigger than http client timeout, see -/// [`default_reqwest_settings`] for default http client settings. -/// - [`repl`]s and [`Dispatcher`] use [`hint_allowed_updates`] to set -/// `allowed_updates`, so you rarely need to pass `allowed_updates` -/// explicitly. -/// -/// [`default_reqwest_settings`]: teloxide::net::default_reqwest_settings -/// [`repl`]: fn@crate::repl -/// [`Dispatcher`]: crate::dispatching::Dispatcher -/// [`hint_allowed_updates`]: -/// crate::dispatching::update_listeners::UpdateListener::hint_allowed_updates +/// This function will automatically delete a webhook if it was set up. +pub async fn polling_default(bot: R) -> Polling +where + R: Requester + Send + 'static, + ::GetUpdates: Send, +{ + let polling = + Polling::builder(bot).timeout(Duration::from_secs(10)).delete_webhook().await.build(); + + assert_update_listener(polling) +} + +/// Returns a long polling update listener with some additional options. +#[deprecated(since = "0.10.0", note = "use `Polling::builder()` instead")] +pub fn polling( + bot: R, + timeout: Option, + limit: Option, + allowed_updates: Option>, +) -> Polling +where + R: Requester + Send + 'static, + ::GetUpdates: Send, +{ + let mut builder = Polling::builder(bot); + builder.timeout = timeout; + builder.limit = limit; + builder.allowed_updates = allowed_updates; + assert_update_listener(builder.build()) +} + +async fn delete_webhook_if_setup(requester: &R) +where + R: Requester, +{ + let webhook_info = match requester.get_webhook_info().send().await { + Ok(ok) => ok, + Err(e) => { + log::error!("Failed to get webhook info: {:?}", e); + return; + } + }; + + let is_webhook_setup = webhook_info.url.is_some(); + + if is_webhook_setup { + if let Err(e) = requester.delete_webhook().send().await { + log::error!("Failed to delete a webhook: {:?}", e); + } + } +} + +#[cfg_attr(doc, aquamarine::aquamarine)] +/// A polling update listener. /// /// ## How it works /// -/// Long polling works by repeatedly calling [`Bot::get_updates`][get_updates]. -/// If telegram has any updates, it returns them immediately, otherwise it waits -/// until either it has any updates or `timeout` expires. +/// Long polling works by repeatedly calling +/// [`Bot::get_updates`][get_updates]. If telegram has any updates, it +/// returns them immediately, otherwise it waits until either it has any +/// updates or `timeout` expires. /// -/// Each [`get_updates`][get_updates] call includes an `offset` parameter equal -/// to the latest update id + one, that allows to only receive updates that has -/// not been received before. +/// Each [`get_updates`][get_updates] call includes an `offset` parameter +/// equal to the latest update id + one, that allows to only receive +/// updates that has not been received before. /// -/// When telegram receives a [`get_updates`][get_updates] request with `offset = -/// N` it forgets any updates with id < `N`. When `polling` listener is stopped, -/// it sends [`get_updates`][get_updates] with `timeout = 0, limit = 1` and -/// appropriate `offset`, so future bot restarts won't see updates that were -/// already seen. +/// When telegram receives a [`get_updates`][get_updates] request with +/// `offset = N` it forgets any updates with id < `N`. When `polling` +/// listener is stopped, it sends [`get_updates`][get_updates] with +/// `timeout = 0, limit = 1` and appropriate `offset`, so future bot +/// restarts won't see updates that were already seen. /// -/// Consumers of a `polling` update listener then need to repeatedly call +/// Consumers of a [`Polling`] update listener then need to repeatedly call /// [`futures::StreamExt::next`] to get the updates. /// -/// Here is an example diagram that shows these interactions between consumers -/// like [`Dispatcher`], `polling` update listener and telegram. +/// Here is an example diagram that shows these interactions between +/// consumers like [`Dispatcher`], [`Polling`] update listener and +/// telegram. /// /// ```mermaid /// sequenceDiagram /// participant C as Consumer -/// participant P as polling +/// participant P as Polling /// participant T as Telegram /// /// link C: Dispatcher @ ../struct.Dispatcher.html @@ -123,131 +234,180 @@ where /// ``` /// /// [get_updates]: crate::requests::Requester::get_updates -pub fn polling( - bot: R, +/// [`Dispatcher`]: crate::dispatching::Dispatcher +#[must_use = "`Polling` is an update listener and does nothing unless used"] +pub struct Polling { + bot: B, timeout: Option, limit: Option, allowed_updates: Option>, -) -> impl UpdateListener + drop_pending_updates: bool, + flag: AsyncStopFlag, + token: AsyncStopToken, +} + +impl Polling where R: Requester + Send + 'static, ::GetUpdates: Send, { - struct State { - bot: B, - timeout: Option, - limit: Option, - allowed_updates: Option>, - offset: i32, - flag: AsyncStopFlag, - token: AsyncStopToken, - force_stop: bool, + /// Returns a builder for polling update listener. + pub fn builder(bot: R) -> PollingBuilder { + PollingBuilder { + bot, + timeout: None, + limit: None, + allowed_updates: None, + drop_pending_updates: false, + } } - - fn stream(st: &mut State) -> impl Stream> + Send + '_ - where - B: Requester + Send, - ::GetUpdates: Send, - { - stream::unfold(st, move |state| async move { - let State { timeout, limit, allowed_updates, bot, offset, flag, force_stop, .. } = - &mut *state; - - if *force_stop { - return None; - } - - if flag.is_stopped() { - let mut req = bot.get_updates().offset(*offset).timeout(0).limit(1); - req.payload_mut().allowed_updates = allowed_updates.take(); - - return match req.send().await { - Ok(_) => None, - Err(err) => { - // Prevents infinite retries, see https://github.com/teloxide/teloxide/issues/496 - *force_stop = true; - - Some((Either::Left(stream::once(ready(Err(err)))), state)) - } - }; - } - - let mut req = bot.get_updates(); - *req.payload_mut() = GetUpdates { - offset: Some(*offset), - timeout: *timeout, - limit: *limit, - allowed_updates: allowed_updates.take(), - }; - - match req.send().await { - Ok(updates) => { - // Set offset to the last update's id + 1 - if let Some(upd) = updates.last() { - *offset = upd.id + 1; - } - - let updates = updates.into_iter().map(Ok); - Some((Either::Right(stream::iter(updates)), state)) - } - Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)), - } - }) - .flatten() - } - - let (token, flag) = AsyncStopToken::new_pair(); - - let state = State { - bot, - timeout: timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")), - limit, - allowed_updates, - offset: 0, - flag, - token, - force_stop: false, - }; - - let stop_token = |st: &mut State<_>| st.token.clone(); - - let hint_allowed_updates = - Some(|state: &mut State<_>, allowed: &mut dyn Iterator| { - // TODO: we should probably warn if there already were different allowed updates - // before - state.allowed_updates = Some(allowed.collect()); - }); - let timeout_hint = Some(move |_: &State<_>| timeout); - - StatefulListener::new_with_hints(state, stream, stop_token, hint_allowed_updates, timeout_hint) } -async fn delete_webhook_if_setup(requester: &R) -where - R: Requester, -{ - let webhook_info = match requester.get_webhook_info().send().await { - Ok(ok) => ok, - Err(e) => { - log::error!("Failed to get webhook info: {:?}", e); - return; - } - }; +#[pin_project::pin_project] +pub struct PollingStream<'a, B: Requester> { + /// Parent structure + polling: &'a mut Polling, - let is_webhook_setup = webhook_info.url.is_some(); + /// Whatever to drop pending updates or not. + drop_pending_updates: bool, - if is_webhook_setup { - if let Err(e) = requester.delete_webhook().send().await { - log::error!("Failed to delete a webhook: {:?}", e); + /// Timeout parameter for normal `get_updates()` calls. + timeout: Option, + /// Allowed updates parameter for the first `get_updates()` call. + allowed_updates: Option>, + /// Offset parameter for normal `get_updates()` calls. + offset: i32, + + /// If this is set, return `None` from `poll_next` immediately. + force_stop: bool, + /// If true we've sent last `get_updates()` call for graceful shutdown. + stopping: bool, + + /// Buffer of updates to be yielded. + buffer: vec::IntoIter, + + /// In-flight `get_updates()` call. + #[pin] + in_flight: Option<::Send>, +} + +impl UpdateListener for Polling { + type StopToken = AsyncStopToken; + + fn stop_token(&mut self) -> Self::StopToken { + self.token.clone() + } + + fn hint_allowed_updates(&mut self, hint: &mut dyn Iterator) { + // TODO: we should probably warn if there already were different allowed updates + // before + self.allowed_updates = Some(hint.collect()); + } + + fn timeout_hint(&self) -> Option { + self.timeout + } +} + +impl<'a, B: Requester + Send + 'a> AsUpdateStream<'a, B::Err> for Polling { + type Stream = PollingStream<'a, B>; + + fn as_stream(&'a mut self) -> Self::Stream { + let timeout = self.timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")); + let allowed_updates = self.allowed_updates.clone(); + let drop_pending_updates = self.drop_pending_updates; + PollingStream { + polling: self, + drop_pending_updates, + timeout, + allowed_updates, + offset: 0, + force_stop: false, + stopping: false, + buffer: Vec::new().into_iter(), + in_flight: None, } } } +impl Stream for PollingStream<'_, B> { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + + if *this.force_stop { + return Ready(None); + } + + // Poll in-flight future until completion + if let Some(in_flight) = this.in_flight.as_mut().as_pin_mut() { + let res = ready!(in_flight.poll(cx)); + this.in_flight.set(None); + + match res { + Ok(_) if *this.stopping => return Ready(None), + Err(err) if *this.stopping => { + // Prevents infinite retries, see https://github.com/teloxide/teloxide/issues/496 + *this.force_stop = true; + + return Ready(Some(Err(err))); + } + Ok(updates) => { + if let Some(upd) = updates.last() { + *this.offset = upd.id + 1; + } + + match *this.drop_pending_updates { + false => *this.buffer = updates.into_iter(), + true => *this.drop_pending_updates = false, + } + } + Err(err) => return Ready(Some(Err(err))), + } + } + + // If there are any buffered updates, return one + if let Some(upd) = this.buffer.next() { + return Ready(Some(Ok(upd))); + } + + *this.stopping = this.polling.flag.is_stopped(); + let (offset, limit, timeout) = match (this.stopping, this.drop_pending_updates) { + // Normal `get_updates()` call + (false, false) => (*this.offset, this.polling.limit, *this.timeout), + // Graceful shutdown `get_updates()` call (shutdown takes priority over dropping pending + // updates) + // + // When stopping we set `timeout = 0` and `limit = 1` so that `get_updates()` + // set last seen update (offset) and return immediately + (true, _) => (*this.offset, Some(1), Some(0)), + // Drop pending updates + (_, true) => (-1, Some(1), Some(0)), + }; + + let req = this + .polling + .bot + .get_updates() + .with_payload_mut(|pay| { + pay.offset = Some(offset); + pay.timeout = timeout; + pay.limit = limit; + pay.allowed_updates = this.allowed_updates.take(); + }) + .send(); + this.in_flight.set(Some(req)); + + // Recurse to poll `self.in_flight` + self.poll_next(cx) + } +} + #[test] fn polling_is_send() { - use crate::dispatching::update_listeners::AsUpdateStream; - let bot = crate::Bot::new("TOKEN"); + #[allow(deprecated)] let mut polling = polling(bot, None, None, None); assert_send(&polling); diff --git a/src/dispatching/update_listeners/webhooks.rs b/src/dispatching/update_listeners/webhooks.rs index b20afd86..ef94fec4 100644 --- a/src/dispatching/update_listeners/webhooks.rs +++ b/src/dispatching/update_listeners/webhooks.rs @@ -4,6 +4,7 @@ use std::net::SocketAddr; use crate::{requests::Requester, types::InputFile}; /// Options related to setting up webhooks. +#[must_use] pub struct Options { /// Local address to listen to. pub address: SocketAddr, @@ -42,13 +43,28 @@ pub struct Options { /// /// Default - false. pub drop_pending_updates: bool, + + /// A secret token to be sent in a header “X-Telegram-Bot-Api-Secret-Token” + /// in every webhook request, 1-256 characters. Only characters `A-Z`, + /// `a-z`, `0-9`, `_` and `-` are allowed. The header is useful to ensure + /// that the request comes from a webhook set by you. + /// + /// Default - teloxide will generate a random token. + pub secret_token: Option, } impl Options { /// Construct a new webhook options, see [`Options::address`] and /// [`Options::url`] for details. pub fn new(address: SocketAddr, url: url::Url) -> Self { - Self { address, url, certificate: None, max_connections: None, drop_pending_updates: false } + Self { + address, + url, + certificate: None, + max_connections: None, + drop_pending_updates: false, + secret_token: None, + } } /// Upload your public key certificate so that the root certificate in use @@ -71,6 +87,32 @@ impl Options { pub fn drop_pending_updates(self) -> Self { Self { drop_pending_updates: true, ..self } } + + /// A secret token to be sent in a header “X-Telegram-Bot-Api-Secret-Token” + /// in every webhook request, 1-256 characters. Only characters `A-Z`, + /// `a-z`, `0-9`, `_` and `-` are allowed. The header is useful to ensure + /// that the request comes from a webhook set by you. + /// + /// ## Panics + /// + /// If the token is invalid. + #[track_caller] + pub fn secret_token(self, token: String) -> Self { + check_secret(token.as_bytes()).expect("Invalid secret token"); + + Self { secret_token: Some(token), ..self } + } + + /// Returns `self.secret_token`, generating a new one if it's `None`. + /// + /// After a call to this function `self.secret_token` is always `Some(_)`. + /// + /// **Note**: if you leave webhook setup to teloxide, it will automatically + /// generate a secret token. Call this function only if you need to know the + /// secret (for example because you are calling `set_webhook` by yourself). + pub fn get_or_gen_secret_token(&mut self) -> &str { + self.secret_token.get_or_insert_with(gen_secret_token) + } } #[cfg(feature = "webhooks-axum")] @@ -91,6 +133,7 @@ where use crate::requests::Request; use teloxide_core::requests::HasPayload; + let secret = options.get_or_gen_secret_token().to_owned(); let &mut Options { ref url, ref mut certificate, max_connections, drop_pending_updates, .. } = options; @@ -99,12 +142,47 @@ where req.payload_mut().certificate = certificate.take(); req.payload_mut().max_connections = max_connections; req.payload_mut().drop_pending_updates = Some(drop_pending_updates); + req.payload_mut().secret_token = Some(secret); req.send().await?; Ok(()) } +/// Generates a random string consisting of 32 characters (`a-z`, `A-Z`, `0-9`, +/// `_` and `-`). +fn gen_secret_token() -> String { + use rand::{distributions::Uniform, Rng}; + const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-"; + const SECRET_LENGTH: usize = 32; + + let random = rand::thread_rng() + .sample_iter(Uniform::new(0, CHARSET.len())) + .map(|idx| CHARSET[idx] as char) + .take(SECRET_LENGTH); + + let mut secret = String::with_capacity(SECRET_LENGTH); + secret.extend(random); + + secret +} + +fn check_secret(bytes: &[u8]) -> Result<&[u8], &'static str> { + let len = bytes.len(); + + if !(1..=256).contains(&len) { + return Err("secret token length must be in range 1..=256"); + } + + let is_not_supported = + |c: &_| !matches!(c, b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'_' | b'-'); + if bytes.iter().any(is_not_supported) { + return Err("secret token must only contain of `a-z`, `A-Z`, `0-9`, `_` and `-` characters"); + } + + Ok(bytes) +} + /// Returns first (`.0`) field from a tuple as a `&mut` reference. /// /// This hack is needed because there isn't currently a way to easily force a diff --git a/src/dispatching/update_listeners/webhooks/axum.rs b/src/dispatching/update_listeners/webhooks/axum.rs index e145c4b2..6c16bdf0 100644 --- a/src/dispatching/update_listeners/webhooks/axum.rs +++ b/src/dispatching/update_listeners/webhooks/axum.rs @@ -1,12 +1,14 @@ -use std::convert::Infallible; +use std::{convert::Infallible, future::Future, pin::Pin}; + +use axum::{ + extract::{FromRequest, RequestParts}, + http::status::StatusCode, +}; use crate::{ dispatching::{ stop_token::{AsyncStopFlag, StopToken}, - update_listeners::{ - webhooks::{setup_webhook, tuple_first_mut, Options}, - UpdateListener, - }, + update_listeners::{webhooks::Options, UpdateListener}, }, requests::Requester, }; @@ -105,15 +107,12 @@ where pub async fn axum_to_router( bot: R, mut options: Options, -) -> Result< - (impl UpdateListener, impl std::future::Future + Send, axum::Router), - R::Err, -> +) -> Result<(impl UpdateListener, impl Future + Send, axum::Router), R::Err> where R: Requester + Send, ::DeleteWebhook: Send, { - use crate::requests::Request; + use crate::{dispatching::update_listeners::webhooks::setup_webhook, requests::Request}; use futures::FutureExt; setup_webhook(&bot, &mut options).await?; @@ -149,12 +148,15 @@ where /// function. pub fn axum_no_setup( options: Options, -) -> (impl UpdateListener, impl std::future::Future, axum::Router) { +) -> (impl UpdateListener, impl Future, axum::Router) { use crate::{ - dispatching::{stop_token::AsyncStopToken, update_listeners}, + dispatching::{ + stop_token::AsyncStopToken, + update_listeners::{self, webhooks::tuple_first_mut}, + }, types::Update, }; - use axum::{extract::Extension, http::StatusCode, response::IntoResponse, routing::post}; + use axum::{extract::Extension, response::IntoResponse, routing::post}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use tower::ServiceBuilder; @@ -167,9 +169,16 @@ pub fn axum_no_setup( async fn telegram_request( input: String, + secret_header: XTelegramBotApiSecretToken, + secret: Extension>, tx: Extension, flag: Extension, ) -> impl IntoResponse { + // FIXME: use constant time comparison here + if secret_header.0.as_deref() != secret.as_deref().map(str::as_bytes) { + return StatusCode::UNAUTHORIZED; + } + let tx = match tx.get() { None => return StatusCode::SERVICE_UNAVAILABLE, // Do not process updates after `.stop()` is called even if the server is still @@ -206,6 +215,7 @@ pub fn axum_no_setup( .layer(TraceLayer::new_for_http()) .layer(Extension(ClosableSender::new(tx))) .layer(Extension(stop_flag.clone())) + .layer(Extension(options.secret_token)) .into_inner(), ); @@ -245,3 +255,32 @@ impl ClosableSender { self.origin.write().unwrap().take(); } } + +struct XTelegramBotApiSecretToken(Option>); + +impl FromRequest for XTelegramBotApiSecretToken { + type Rejection = StatusCode; + + fn from_request<'l0, 'at>( + req: &'l0 mut RequestParts, + ) -> Pin> + Send + 'at>> + where + 'l0: 'at, + Self: 'at, + { + use crate::dispatching::update_listeners::webhooks::check_secret; + + let res = req + .headers_mut() + .remove("x-telegram-bot-api-secret-token") + .map(|header| { + check_secret(header.as_bytes()) + .map(<_>::to_owned) + .map_err(|_| StatusCode::BAD_REQUEST) + }) + .transpose() + .map(Self); + + Box::pin(async { res }) as _ + } +} diff --git a/src/features.md b/src/features.md new file mode 100644 index 00000000..2c4b2951 --- /dev/null +++ b/src/features.md @@ -0,0 +1,34 @@ +## Cargo features + +| Feature | Description | +|----------------------|--------------------------------------------------------------------------------------------| +| `webhooks` | Enables general webhook utilities (almost useless on its own) | +| `webhooks-axum` | Enables webhook implementation based on axum framework | +| `macros` | Re-exports macros from [`teloxide-macros`]. | +| `ctrlc_handler` | Enables the [`DispatcherBuilder::enable_ctrlc_handler`] function (**enabled by default**). | +| `auto-send` | Enables the [`AutoSend`](adaptors::AutoSend) bot adaptor (**enabled by default**). | +| `throttle` | Enables the [`Throttle`](adaptors::Throttle) bot adaptor. | +| `cache-me` | Enables the [`CacheMe`](adaptors::CacheMe) bot adaptor. | +| `trace-adaptor` | Enables the [`Trace`](adaptors::Trace) bot adaptor. | +| `erased` | Enables the [`ErasedRequester`](adaptors::ErasedRequester) bot adaptor. | +| `full` | Enables all the features except `nightly`. | +| `nightly` | Enables nightly-only features (see the [teloxide-core features]). | +| `native-tls` | Enables the [`native-tls`] TLS implementation (**enabled by default**). | +| `rustls` | Enables the [`rustls`] TLS implementation. | +| `redis-storage` | Enables the [Redis] storage support for dialogues. | +| `sqlite-storage` | Enables the [Sqlite] storage support for dialogues. | +| `cbor-serializer` | Enables the [CBOR] serializer for dialogues. | +| `bincode-serializer` | Enables the [Bincode] serializer for dialogues. | + + +[Redis]: https://redis.io/ +[Sqlite]: https://www.sqlite.org/ +[CBOR]: https://en.wikipedia.org/wiki/CBOR +[Bincode]: https://github.com/servo/bincode +[`teloxide-macros`]: https://github.com/teloxide/teloxide-macros +[`native-tls`]: https://docs.rs/native-tls +[`rustls`]: https://docs.rs/rustls +[`teloxide::utils::UpState`]: utils::UpState +[teloxide-core features]: https://docs.rs/teloxide-core/latest/teloxide_core/#cargo-features + +[`DispatcherBuilder::enable_ctrlc_handler`]: dispatching::DispatcherBuilder::enable_ctrlc_handler \ No newline at end of file diff --git a/src/features.txt b/src/features.txt deleted file mode 100644 index 32d737b3..00000000 --- a/src/features.txt +++ /dev/null @@ -1,29 +0,0 @@ -## Cargo features - -| Feature | Description | -|----------|----------| -| `redis-storage` | Enables the [Redis] storage support for dialogues.| -| `sqlite-storage` | Enables the [Sqlite] storage support for dialogues. | -| `cbor-serializer` | Enables the [CBOR] serializer for dialogues. | -| `bincode-serializer` | Enables the [Bincode] serializer for dialogues. | -| `macros` | Re-exports macros from [`teloxide-macros`]. | -| `native-tls` | Enables the [`native-tls`] TLS implementation (enabled by default). | -| `rustls` | Enables the [`rustls`] TLS implementation. | -| `ctrlc_handler` | Enables the [`Dispatcher::setup_ctrlc_handler`](dispatching::Dispatcher::setup_ctrlc_handler) function. | -| `auto-send` | Enables the [`AutoSend`](adaptors::AutoSend) bot adaptor. | -| `throttle` | Enables the [`Throttle`](adaptors::Throttle) bot adaptor. | -| `cache-me` | Enables the [`CacheMe`](adaptors::CacheMe) bot adaptor. | -| `trace-adaptor` | Enables the [`Trace`](adaptors::Trace) bot adaptor. | -| `erased` | Enables the [`ErasedRequester`](adaptors::ErasedRequester) bot adaptor. | -| `full` | Enables all the features except `nightly`. | -| `nightly` | Enables nightly-only features (see the [teloxide-core features]). | - -[Redis]: https://redis.io/ -[Sqlite]: https://www.sqlite.org/ -[CBOR]: https://en.wikipedia.org/wiki/CBOR -[Bincode]: https://github.com/servo/bincode -[`teloxide-macros`]: https://github.com/teloxide/teloxide-macros -[`native-tls`]: https://docs.rs/native-tls -[`rustls`]: https://docs.rs/rustls -[`teloxide::utils::UpState`]: utils::UpState -[teloxide-core features]: https://docs.rs/teloxide-core/latest/teloxide_core/#cargo-features diff --git a/src/lib.rs b/src/lib.rs index f53ed5c5..2964cd74 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,21 +38,17 @@ // [1]: https://github.com/rust-lang/rustfmt/issues/4210 // [2]: https://github.com/rust-lang/rustfmt/issues/4787 // [3]: https://github.com/rust-lang/rust/issues/82768#issuecomment-803935643 -#![cfg_attr(feature = "nightly", cfg_attr(feature = "nightly", doc = include_str!("features.txt")))] +#![cfg_attr(feature = "nightly", cfg_attr(feature = "nightly", doc = include_str!("features.md")))] // https://github.com/teloxide/teloxide/raw/master/logo.svg doesn't work in html_logo_url, I don't know why. #![doc( html_logo_url = "https://github.com/teloxide/teloxide/raw/master/ICON.png", html_favicon_url = "https://github.com/teloxide/teloxide/raw/master/ICON.png" )] -// We pass "--cfg docsrs" when building docs to add `This is supported on -// feature="..." only.` -// -// "--cfg dep_docsrs" is used for the same reason, but for `teloxide-core`. -// // To properly build docs of this crate run // ```console -// $ RUSTFLAGS="--cfg dep_docsrs" RUSTDOCFLAGS="--cfg docsrs -Znormalize-docs" cargo +nightly doc --open --all-features +// $ cargo docs --open // ``` +// (docs is an alias from `.cargo/config.toml`) #![cfg_attr(all(docsrs, feature = "nightly"), feature(doc_cfg, doc_auto_cfg))] #![forbid(unsafe_code)] #![warn(rustdoc::broken_intra_doc_links)] diff --git a/src/utils/html.rs b/src/utils/html.rs index 11ae475d..dc655fbf 100644 --- a/src/utils/html.rs +++ b/src/utils/html.rs @@ -191,6 +191,8 @@ mod tests { last_name: None, username: Some("abcd".to_string()), language_code: None, + is_premium: false, + added_to_attachment_menu: false, }; assert_eq!(user_mention_or_link(&user_with_username), "@abcd"); let user_without_username = User { @@ -200,6 +202,8 @@ mod tests { last_name: None, username: None, language_code: None, + is_premium: false, + added_to_attachment_menu: false, }; assert_eq!( user_mention_or_link(&user_without_username), diff --git a/src/utils/markdown.rs b/src/utils/markdown.rs index c07d53d9..4419feb9 100644 --- a/src/utils/markdown.rs +++ b/src/utils/markdown.rs @@ -240,6 +240,8 @@ mod tests { last_name: None, username: Some("abcd".to_string()), language_code: None, + is_premium: false, + added_to_attachment_menu: false, }; assert_eq!(user_mention_or_link(&user_with_username), "@abcd"); let user_without_username = User { @@ -249,6 +251,8 @@ mod tests { last_name: None, username: None, language_code: None, + is_premium: false, + added_to_attachment_menu: false, }; assert_eq!( user_mention_or_link(&user_without_username), diff --git a/tests/command.rs b/tests/command.rs index b3e6609e..4b59ac48 100644 --- a/tests/command.rs +++ b/tests/command.rs @@ -2,7 +2,7 @@ #![allow(clippy::nonstandard_macro_braces)] #[cfg(feature = "macros")] -use teloxide::utils::command::{BotCommands, ParseError}; +use teloxide::utils::command::BotCommands; // We put tests here because macro expand in unit tests in module // teloxide::utils::command was a failure @@ -141,22 +141,33 @@ fn parse_with_split2() { #[test] #[cfg(feature = "macros")] fn parse_custom_parser() { - fn custom_parse_function(s: String) -> Result<(u8, String), ParseError> { - let vec = s.split_whitespace().collect::>(); - let (left, right) = match vec.as_slice() { - [l, r] => (l, r), - _ => return Err(ParseError::IncorrectFormat("might be 2 arguments!".into())), - }; - left.parse::() - .map(|res| (res, (*right).to_string())) - .map_err(|_| ParseError::Custom("First argument must be a integer!".to_owned().into())) + mod parser { + use teloxide::utils::command::ParseError; + + pub fn custom_parse_function(s: String) -> Result<(u8, String), ParseError> { + let vec = s.split_whitespace().collect::>(); + let (left, right) = match vec.as_slice() { + [l, r] => (l, r), + _ => return Err(ParseError::IncorrectFormat("might be 2 arguments!".into())), + }; + left.parse::().map(|res| (res, (*right).to_string())).map_err(|_| { + ParseError::Custom("First argument must be a integer!".to_owned().into()) + }) + } } + use parser::custom_parse_function; + #[derive(BotCommands, Debug, PartialEq)] #[command(rename = "lowercase")] enum DefaultCommands { #[command(parse_with = "custom_parse_function")] Start(u8, String), + + // Test . + #[command(parse_with = "parser::custom_parse_function")] + TestPath(u8, String), + Help, } @@ -164,6 +175,10 @@ fn parse_custom_parser() { DefaultCommands::Start(10, "hello".to_string()), DefaultCommands::parse("/start 10 hello", "").unwrap() ); + assert_eq!( + DefaultCommands::TestPath(10, "hello".to_string()), + DefaultCommands::parse("/testpath 10 hello", "").unwrap() + ); } #[test]