Merge pull request #678 from teloxide/dev

Merge v0.10.0
This commit is contained in:
Hirrolot 2022-07-21 17:03:55 +06:00 committed by GitHub
commit 9f1a5302ef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
32 changed files with 1294 additions and 596 deletions

12
.cargo/config.toml Normal file
View file

@ -0,0 +1,12 @@
[alias]
# We pass "--cfg docsrs" when building docs to turn on nightly-only rustdoc features like
# `This is supported on feature="..." only.`
#
# "--cfg dep_docsrs" is used for the same reason, but for `teloxide-core`.
docs = """
doc
--all-features
--config build.rustflags=["--cfg=dep_docsrs"]
--config build.rustdocflags=["--cfg=docsrs","-Znormalize-docs"]
-Zrustdoc-scrape-examples=examples
"""

View file

@ -7,47 +7,64 @@ on:
name: Continuous integration
env:
RUSTFLAGS: "--cfg CI_REDIS"
RUSTFLAGS: "--cfg CI_REDIS -Dwarnings"
RUSTDOCFLAGS: -Dwarnings
RUST_BACKTRACE: short
CARGO_INCREMENTAL: 0
CARGO_NET_RETRY: 10
RUSTUP_MAX_RETRIES: 10
rust_nightly: nightly-2022-07-01
# When updating this, also update:
# - README.md
# - src/lib.rs
# - down below in a matrix
rust_msrv: 1.58.0
jobs:
style:
# Depends on all action that are required for a "successful" CI run.
ci-pass:
name: CI succeeded
runs-on: ubuntu-latest
needs:
- fmt
- test
- check-examples
- clippy
- doc
steps:
- run: exit 0
fmt:
name: fmt
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
- name: Checkout
uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_nightly }}
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2022-05-06
toolchain: ${{ env.rust_nightly }}
override: true
components: rustfmt
- name: fmt
- name: Cache Dependencies
uses: Swatinem/rust-cache@v1
- name: Check formatting
uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
clippy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2022-05-06
override: true
components: clippy
- name: clippy
uses: actions-rs/cargo@v1
with:
command: clippy
args: --all-targets --all-features -- -D warnings
args: --all -- --check
test:
name: Test
runs-on: ubuntu-latest
strategy:
matrix:
@ -65,48 +82,120 @@ jobs:
toolchain: beta
features: "--features full"
- rust: nightly
toolchain: nightly-2022-05-06
toolchain: nightly-2022-07-01
features: "--all-features"
- rust: msrv
toolchain: "1.58.0"
toolchain: 1.58.0
features: "--features full"
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Install Rust ${{ matrix.toolchain }}
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: ${{ matrix.toolchain }}
override: true
- name: build
- name: Cache Dependencies
uses: Swatinem/rust-cache@v1
# NB. Don't test (build) examples so we can use non-msrv features in them (--tests/--doc)
- name: Compile
uses: actions-rs/cargo@v1
with:
command: build
args: --verbose ${{ matrix.features }}
command: test
args: --tests --no-run --verbose ${{ matrix.features }}
- name: Setup redis
run: |
sudo apt install redis-server
redis-server --port 7777 > /dev/null &
redis-server --port 7778 > /dev/null &
redis-server --port 7779 > /dev/null &
- name: test
- name: Test unit & integration tests
uses: actions-rs/cargo@v1
with:
command: test
args: --verbose ${{ matrix.features }}
args: --tests --verbose ${{ matrix.features }}
- name: Test documentation tests
if: ${{ matrix.rust != 'msrv' }}
uses: actions-rs/cargo@v1
with:
command: test
args: --doc --verbose ${{ matrix.features }}
build-example:
check-examples:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
- name: Checkout
uses: actions/checkout@v3
- name: Install Rust stable
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- name: Check the examples
run: cargo check --examples --features="full"
- name: Cache Dependencies
uses: Swatinem/rust-cache@v1
- name: Check examples
uses: actions-rs/cargo@v1
with:
command: check
args: --examples --features full
clippy:
name: Run linter
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_nightly }}
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: ${{ env.rust_nightly }}
override: true
components: clippy
- name: Cache Dependencies
uses: Swatinem/rust-cache@v1
- name: clippy
uses: actions-rs/cargo@v1
with:
command: clippy
args: --all-targets --all-features
doc:
name: check docs
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_nightly }}
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: ${{ env.rust_nightly }}
override: true
- name: Cache Dependencies
uses: Swatinem/rust-cache@v1
- name: rustdoc
uses: actions-rs/cargo@v1
with:
command: docs # from .cargo/config.toml

View file

@ -6,6 +6,32 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## unreleased
## 0.10.0 - 2022-07-21
### Added
- Security checks based on `secret_token` param of `set_webhook` to built-in webhooks.
- `dispatching::update_listeners::{PollingBuilder, Polling, PollingStream}`.
- `DispatcherBuilder::enable_ctrlc_handler` method.
### Fixed
- `Dispatcher` no longer "leaks" memory for every inactive user ([PR 657](https://github.com/teloxide/teloxide/pull/657)).
- Allow specifying a path to a custom command parser in `parse_with` ([issue 668](https://github.com/teloxide/teloxide/issues/668)).
### Changed
- Add the `Key: Clone` requirement for `impl Dispatcher` [**BC**].
- `dispatching::update_listeners::{polling_default, polling}` now return a named, `Polling<_>` type.
- Update teloxide-core to v0.7.0 with Bot API 6.1 support, see [its changelog][core07c] for more information [**BC**].
[core07c]: https://github.com/teloxide/teloxide-core/blob/master/CHANGELOG.md#070---2022-07-19
### Deprecated
- The `dispatching::update_listeners::polling` function.
- `Dispatcher::setup_ctrlc_handler` method.
## 0.9.2 - 2022-06-07
### Fixed

View file

@ -1,6 +1,6 @@
[package]
name = "teloxide"
version = "0.9.2"
version = "0.10.0"
edition = "2021"
description = "An elegant Telegram bots framework for Rust"
repository = "https://github.com/teloxide/teloxide"
@ -14,7 +14,8 @@ exclude = ["media"]
[features]
default = ["native-tls", "ctrlc_handler", "teloxide-core/default", "auto-send"]
webhooks-axum = ["axum", "tower", "tower-http"]
webhooks = ["rand"]
webhooks-axum = ["webhooks", "axum", "tower", "tower-http"]
sqlite-storage = ["sqlx"]
redis-storage = ["redis"]
@ -56,16 +57,21 @@ full = [
]
[dependencies]
teloxide-core = { version = "0.6.0", default-features = false }
teloxide-macros = { version = "0.6.2", optional = true }
teloxide-core = { version = "0.7.0", default-features = false }
teloxide-macros = { version = "0.6.3", optional = true }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
dptree = "0.2.1"
dptree = "0.3.0"
# These lines are used only for development.
# teloxide-core = { git = "https://github.com/teloxide/teloxide-core", rev = "b13393d", default-features = false }
# teloxide-macros = { git = "https://github.com/teloxide/teloxide-macros", rev = "44d91c5", optional = true }
# dptree = { git = "https://github.com/teloxide/dptree", rev = "df578e4" }
tokio = { version = "1.8", features = ["fs"] }
tokio-util = "0.6"
tokio-util = "0.7"
tokio-stream = "0.1.8"
url = "2.2.2"
@ -80,27 +86,26 @@ pin-project = "1.0"
serde_with_macros = "1.4"
aquamarine = "0.1.11"
sqlx = { version = "0.5", optional = true, default-features = false, features = [
sqlx = { version = "0.6", optional = true, default-features = false, features = [
"runtime-tokio-native-tls",
"macros",
"sqlite",
] }
redis = { version = "0.20", features = ["tokio-comp"], optional = true }
redis = { version = "0.21", features = ["tokio-comp"], optional = true }
serde_cbor = { version = "0.11", optional = true }
bincode = { version = "1.3", optional = true }
axum = { version = "0.4.8", optional = true }
axum = { version = "0.5.13", optional = true }
tower = { version = "0.4.12", optional = true }
tower-http = { version = "0.2.5", features = ["trace"], optional = true }
tower-http = { version = "0.3.4", features = ["trace"], optional = true }
rand = { version = "0.8.5", optional = true }
[dev-dependencies]
rand = "0.8.3"
pretty_env_logger = "0.4.0"
once_cell = "1.9.0"
serde = "1"
serde_json = "1"
tokio = { version = "1.8", features = ["fs", "rt-multi-thread", "macros"] }
warp = "0.3.0"
reqwest = "0.10.4"
reqwest = "0.11.11"
chrono = "0.4"
tokio-stream = "0.1"
@ -153,6 +158,10 @@ required-features = ["macros"]
name = "ngrok_ping_pong"
required-features = ["webhooks-axum"]
[[example]]
name = "heroku_ping_pong"
required-features = ["webhooks-axum"]
[[example]]
name = "purchase"
required-features = ["macros"]

View file

@ -1,6 +1,47 @@
This document describes breaking changes of `teloxide` crate, as well as the ways to update code.
Note that the list of required changes is not fully exhaustive and it may lack something in rare cases.
## 0.9 -> 0.10
### core
We've added some convenience functions to `InlineKeyboardButton` so it's easier to construct it. Consider using them instead of variants:
```diff
-InlineKeyboardButton::new("text", InlineKeyboardButtonKind::Url(url))
+InlineKeyboardButton::url("text", url)
```
`file_size` fields are now `u32`, you may need to update your code accordingly:
```diff
-let file_size: u64 = audio.file_size?;
+let file_size: u32 = audio.file_size;
```
Some places now use `FileMeta` instead of `File`, you may need to change types.
`Sticker` and `StickerSet` now has a `kind` field instead of `is_animated` and `is_video`:
```diff
+use teloxide::types::StickerKind::*;
-match () {
+match sticker.kind {
- _ if sticker.is_animated => /* handle animated */,
+ Animated => /* handle animated */,
- _ if sticker.is_video => /* handle video */,
+ Video => /* handle video */,
- _ => /* handle normal */,
+ Webp => /* handle normal */,
}
```
### teloxide
Teloxide itself doesn't have any major API changes.
Note however that some function were deprecated:
- Instead of `dispatching::update_listeners::polling` use `polling_builder`
- Instead of `Dispatcher::setup_ctrlc_handler` use `DispatcherBuilder::enable_ctrlc_handler`
## 0.7 -> 0.8
### core
@ -8,7 +49,7 @@ Note that the list of required changes is not fully exhaustive and it may lack s
`user.id` now uses `UserId` type, `ChatId` now represents only _chat id_, not channel username, all `chat_id` function parameters now accept `Recipient` (if they allow for channel usernames).
If you used to work with chat/user ids (for example saving them to a database), you may need to change your code to account for new types. Some examples how that may look like:
```diff,
```diff
-let user_id: i64 = user.id;
+let UserId(user_id) = user.id;
db.save(user_id, ...);

View file

@ -1,6 +1,4 @@
> [v0.7 -> v0.8 migration guide >>](MIGRATION_GUIDE.md#07---08)
> `teloxide-core` versions less that `0.4.5` (`teloxide` versions less than 0.7.3) have a low-severity security vulnerability, [learn more >>](https://github.com/teloxide/teloxide/discussions/574)
> [v0.9 -> v0.10 migration guide >>](MIGRATION_GUIDE.md#09---010)
<div align="center">
<img src="../../raw/master/ICON.png" width="250"/>
@ -15,7 +13,7 @@
<img src="https://img.shields.io/crates/v/teloxide.svg">
</a>
<a href="https://core.telegram.org/bots/api">
<img src="https://img.shields.io/badge/API%20coverage-Up%20to%206.0%20(inclusively)-green.svg">
<img src="https://img.shields.io/badge/API%20coverage-Up%20to%206.1%20(inclusively)-green.svg">
</a>
<a href="https://t.me/teloxide">
<img src="https://img.shields.io/badge/support-t.me%2Fteloxide-blueviolet">
@ -72,7 +70,7 @@ $ rustup override set nightly
5. Run `cargo new my_bot`, enter the directory and put these lines into your `Cargo.toml`:
```toml
[dependencies]
teloxide = { version = "0.9", features = ["macros", "auto-send"] }
teloxide = { version = "0.10", features = ["macros", "auto-send"] }
log = "0.4"
pretty_env_logger = "0.4"
tokio = { version = "1.8", features = ["rt-multi-thread", "macros"] }
@ -105,9 +103,7 @@ async fn main() {
```
<div align="center">
<kbd>
<img src=../../raw/master/media/throw-dice.gif width="420px" />
</kbd>
<img src="../../raw/master/media/throw-dice.gif" width="420" />
</div>
### Commands
@ -175,9 +171,7 @@ async fn answer(
```
<div align="center">
<kbd>
<img src=../../raw/master/media/command.gif width="420px" />
</kbd>
<img src="../../raw/master/media/command.gif" width="420" />
</div>
### Dialogues management
@ -229,8 +223,8 @@ async fn main() {
),
)
.dependencies(dptree::deps![InMemStorage::<State>::new()])
.enable_ctrlc_handler()
.build()
.setup_ctrlc_handler()
.dispatch()
.await;
}
@ -300,9 +294,7 @@ async fn receive_location(
```
<div align="center">
<kbd>
<img src=../../raw/master/media/dialogue.gif width="420px" />
</kbd>
<img src="../../raw/master/media/dialogue.gif" width="420" />
</div>
[More examples >>](examples/)
@ -327,11 +319,7 @@ A: No, only the bots API.
**Q: Can I use webhooks?**
A: teloxide doesn't provide a special API for working with webhooks due to their nature with lots of subtle settings. Instead, you should setup your webhook by yourself, as shown in [`examples/ngrok_ping_pong_bot`](examples/ngrok_ping_pong.rs) and [`examples/heroku_ping_pong_bot`](examples/heroku_ping_pong.rs).
Associated links:
- [Marvin's Marvellous Guide to All Things Webhook](https://core.telegram.org/bots/webhooks)
- [Using self-signed certificates](https://core.telegram.org/bots/self-signed)
A: You can! Teloxide has a built-in support for webhooks in `dispatching::update_listeners::webhooks` module. See how it's used in [`examples/ngrok_ping_pong_bot`](examples/ngrok_ping_pong.rs) and [`examples/heroku_ping_pong_bot`](examples/heroku_ping_pong.rs).
**Q: Can I handle both callback queries and messages within a single dialogue?**
@ -341,29 +329,33 @@ A: Yes, see [`examples/purchase.rs`](examples/purchase.rs).
Feel free to propose your own bot to our collection!
- [WaffleLapkin/crate_upd_bot](https://github.com/WaffleLapkin/crate_upd_bot) — A bot that notifies about crate updates.
- [mxseev/logram](https://github.com/mxseev/logram) — Utility that takes logs from anywhere and sends them to Telegram.
- [alexkonovalov/PedigreeBot](https://github.com/alexkonovalov/PedigreeBot) — A Telegram bot for building family trees.
- [Hermitter/tepe](https://github.com/Hermitter/tepe) — A CLI to command a bot to send messages and files over Telegram.
- [mattrighetti/GroupActivityBot](https://github.com/mattrighetti/group-activity-bot-rs) — Telegram bot that keeps track of user activity in groups.
- [mattrighetti/libgen-bot-rs](https://github.com/mattrighetti/libgen-bot-rs) — Telgram bot to interface with libgen
- [dracarys18/grpmr-rs](https://github.com/dracarys18/grpmr-rs) — A Telegram group manager bot with variety of extra features.
- [steadylearner/subreddit_reader](https://github.com/steadylearner/Rust-Full-Stack/tree/master/commits/teloxide/subreddit_reader) — A bot that shows the latest posts at Rust subreddit.
- [myblackbeard/basketball-betting-bot](https://github.com/myblackbeard/basketball-betting-bot) — The bot lets you bet on NBA games against your buddies.
- [ArtHome12/vzmuinebot](https://github.com/ArtHome12/vzmuinebot) — Telegram bot for food menu navigate.
- [ArtHome12/cognito_bot](https://github.com/ArtHome12/cognito_bot) — The bot is designed to anonymize messages to a group.
- [pro-vim/tg-vimhelpbot](https://github.com/pro-vim/tg-vimhelpbot) — Link `:help` for Vim in Telegram.
- [sschiz/janitor-bot](https://github.com/sschiz/janitor-bot) — A bot that removes users trying to join to a chat that is designed for comments.
- [slondr/BeerHolderBot](https://gitlab.com/slondr/BeerHolderBot) — A bot that holds your beer.
- [MustafaSalih1993/Miss-Vodka-Telegram-Bot](https://github.com/MustafaSalih1993/Miss-Vodka-Telegram-Bot) — A Telegram bot written in rust using "Teloxide" library.
- [x13a/tg-prompt](https://github.com/x13a/tg-prompt) — Telegram prompt.
- [magnickolas/remindee-bot](https://github.com/magnickolas/remindee-bot) — Telegram bot for managing reminders.
- [cyberknight777/knight-bot](https://gitlab.com/cyberknight777/knight-bot) — A Telegram bot with variety of fun features.
- [wa7sa34cx/the-black-box-bot](https://github.com/wa7sa34cx/the-black-box-bot) — This is the Black Box Telegram bot. You can hold any items in it.
- [crapstone/hsctt](https://codeberg.org/crapstones-bots/hsctt) — A Telegram bot that searches for HTTP status codes in all messages and replies with the text form.
- [alenpaul2001/AurSearchBot](https://gitlab.com/alenpaul2001/aursearchbot) — Telegram bot for searching AUR in inline mode.
- [studiedlist/EddieBot](https://gitlab.com/studiedlist/eddie-bot) — Chatting bot with several entertainment features.
- [`raine/tgreddit`](https://github.com/raine/tgreddit) — A bot that sends the top posts of your favorite subreddits to Telegram.
- [`magnickolas/remindee-bot`](https://github.com/magnickolas/remindee-bot) — Telegram bot for managing reminders.
- [`WaffleLapkin/crate_upd_bot`](https://github.com/WaffleLapkin/crate_upd_bot) — A bot that notifies about crate updates.
- [`mattrighetti/GroupActivityBot`](https://github.com/mattrighetti/group-activity-bot-rs) — Telegram bot that keeps track of user activity in groups.
- [`alenpaul2001/AurSearchBot`](https://gitlab.com/alenpaul2001/aursearchbot) — Telegram bot for searching in Arch User Repository (AUR).
- [`ArtHome12/vzmuinebot`](https://github.com/ArtHome12/vzmuinebot) — Telegram bot for food menu navigate.
- [`studiedlist/EddieBot`](https://gitlab.com/studiedlist/eddie-bot) — Chatting bot with several entertainment features.
- [`modos189/tg_blackbox_bot`](https://gitlab.com/modos189/tg_blackbox_bot) — Anonymous feedback for your Telegram project.
- [`0xNima/spacecraft`](https://github.com/0xNima/spacecraft) — Yet another telegram bot to downloading Twitter spaces.
- [`0xNima/Twideo`](https://github.com/0xNima/Twideo) — Simple Telegram Bot for downloading videos from Twitter via their links.
- [`mattrighetti/libgen-bot-rs`](https://github.com/mattrighetti/libgen-bot-rs) — Telgram bot to interface with libgen.
- [`zamazan4ik/npaperbot-telegram`](https://github.com/zamazan4ik/npaperbot-telegram) — Telegram bot for searching via C++ proposals.
<details>
<summary>Show bots using teloxide older than v0.6.0</summary>
- [`mxseev/logram`](https://github.com/mxseev/logram) — Utility that takes logs from anywhere and sends them to Telegram.
- [`alexkonovalov/PedigreeBot`](https://github.com/alexkonovalov/PedigreeBot) — A Telegram bot for building family trees.
- [`Hermitter/tepe`](https://github.com/Hermitter/tepe) — A CLI to command a bot to send messages and files over Telegram.
- [`myblackbeard/basketball-betting-bot`](https://github.com/myblackbeard/basketball-betting-bot) — The bot lets you bet on NBA games against your buddies.
- [`dracarys18/grpmr-rs`](https://github.com/dracarys18/grpmr-rs) — Modular Telegram Group Manager Bot written in Rust.
- [`ArtHome12/vzmuinebot`](https://github.com/ArtHome12/vzmuinebot) — Telegram bot for food menu navigate.
- [`ArtHome12/cognito_bot`](https://github.com/ArtHome12/cognito_bot) — The bot is designed to anonymize messages to a group.
- [`crapstone/hsctt`](https://codeberg.org/crapstones-bots/hsctt) — A bot that converts HTTP status codes into text.
</details>
## Contributing
See [`CONRIBUTING.md`](CONTRIBUTING.md).

View file

@ -30,7 +30,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.branch(Update::filter_callback_query().endpoint(callback_handler))
.branch(Update::filter_inline_query().endpoint(inline_query_handler));
Dispatcher::builder(bot, handler).build().setup_ctrlc_handler().dispatch().await;
Dispatcher::builder(bot, handler).enable_ctrlc_handler().build().dispatch().await;
Ok(())
}

View file

@ -14,18 +14,13 @@ type MyDialogue = Dialogue<State, ErasedStorage<State>>;
type MyStorage = std::sync::Arc<ErasedStorage<State>>;
type HandlerResult = Result<(), Box<dyn std::error::Error + Send + Sync>>;
#[derive(Clone, serde::Serialize, serde::Deserialize)]
#[derive(Clone, Default, serde::Serialize, serde::Deserialize)]
pub enum State {
#[default]
Start,
GotNumber(i32),
}
impl Default for State {
fn default() -> Self {
Self::Start
}
}
#[derive(Clone, BotCommands)]
#[command(rename = "lowercase", description = "These commands are supported:")]
pub enum Command {
@ -59,8 +54,8 @@ async fn main() {
Dispatcher::builder(bot, handler)
.dependencies(dptree::deps![storage])
.enable_ctrlc_handler()
.build()
.setup_ctrlc_handler()
.dispatch()
.await;
}

View file

@ -18,18 +18,18 @@ use teloxide::{dispatching::dialogue::InMemStorage, prelude::*};
type MyDialogue = Dialogue<State, InMemStorage<State>>;
type HandlerResult = Result<(), Box<dyn std::error::Error + Send + Sync>>;
#[derive(Clone)]
#[derive(Clone, Default)]
pub enum State {
#[default]
Start,
ReceiveFullName,
ReceiveAge { full_name: String },
ReceiveLocation { full_name: String, age: u8 },
}
impl Default for State {
fn default() -> Self {
Self::Start
}
ReceiveAge {
full_name: String,
},
ReceiveLocation {
full_name: String,
age: u8,
},
}
#[tokio::main]
@ -51,8 +51,8 @@ async fn main() {
),
)
.dependencies(dptree::deps![InMemStorage::<State>::new()])
.enable_ctrlc_handler()
.build()
.setup_ctrlc_handler()
.dispatch()
.await;
}

View file

@ -87,8 +87,8 @@ async fn main() {
.error_handler(LoggingErrorHandler::with_custom_text(
"An error has occurred in the dispatcher",
))
.enable_ctrlc_handler()
.build()
.setup_ctrlc_handler()
.dispatch()
.await;
}

View file

@ -16,25 +16,12 @@
// heroku buildpacks:set emk/rust
// ```
//
// [1] https://github.com/emk/heroku-buildpack-rust
// [1]: https://github.com/emk/heroku-buildpack-rust
// TODO: use built-in webhook support
use std::env;
use teloxide::{
dispatching::{
stop_token::AsyncStopToken,
update_listeners::{self, StatefulListener},
},
prelude::*,
types::Update,
};
use std::{convert::Infallible, env, net::SocketAddr};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use warp::Filter;
use reqwest::{StatusCode, Url};
use teloxide::{dispatching::update_listeners::webhooks, prelude::*};
use url::Url;
#[tokio::main]
async fn main() {
@ -42,66 +29,31 @@ async fn main() {
log::info!("Starting Heroku ping-pong bot...");
let bot = Bot::from_env().auto_send();
let token = bot.inner().token();
// Heroku auto defines a port value
let port: u16 = env::var("PORT")
.expect("PORT env variable is not set")
.parse()
.expect("PORT env variable value is not an integer");
let addr = ([0, 0, 0, 0], port).into();
// Heroku host example: "heroku-ping-pong-bot.herokuapp.com"
let host = env::var("HOST").expect("HOST env variable is not set");
let url = Url::parse(&format!("https://{host}/webhooks/{token}")).unwrap();
let listener = webhooks::axum(bot.clone(), webhooks::Options::new(addr, url))
.await
.expect("Couldn't setup webhook");
teloxide::repl_with_listener(
bot.clone(),
bot,
|msg: Message, bot: AutoSend<Bot>| async move {
bot.send_message(msg.chat.id, "pong").await?;
respond(())
},
webhook(bot).await,
listener,
)
.await;
}
async fn handle_rejection(error: warp::Rejection) -> Result<impl warp::Reply, Infallible> {
log::error!("Cannot process the request due to: {:?}", error);
Ok(StatusCode::INTERNAL_SERVER_ERROR)
}
pub async fn webhook(bot: AutoSend<Bot>) -> impl update_listeners::UpdateListener<Infallible> {
// Heroku auto defines a port value
let teloxide_token = env::var("TELOXIDE_TOKEN").expect("TELOXIDE_TOKEN env variable missing");
let port: u16 = env::var("PORT")
.expect("PORT env variable missing")
.parse()
.expect("PORT value to be integer");
// Heroku host example .: "heroku-ping-pong-bot.herokuapp.com"
let host = env::var("HOST").expect("have HOST env variable");
let path = format!("bot{teloxide_token}");
let url = Url::parse(&format!("https://{host}/{path}")).unwrap();
bot.set_webhook(url).await.expect("Cannot setup a webhook");
let (tx, rx) = mpsc::unbounded_channel();
let server = warp::post()
.and(warp::path(path))
.and(warp::body::json())
.map(move |update: Update| {
tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook");
StatusCode::OK
})
.recover(handle_rejection);
let (stop_token, stop_flag) = AsyncStopToken::new_pair();
let addr = format!("0.0.0.0:{port}").parse::<SocketAddr>().unwrap();
let server = warp::serve(server);
let (_addr, fut) = server.bind_with_graceful_shutdown(addr, stop_flag);
// You might want to use serve.key_path/serve.cert_path methods here to
// setup a self-signed TLS certificate.
tokio::spawn(fut);
let stream = UnboundedReceiverStream::new(rx);
fn streamf<S, T>(state: &mut (S, T)) -> &mut S {
&mut state.0
}
StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| {
state.1.clone()
})
}

View file

@ -60,5 +60,5 @@ async fn main() {
},
));
Dispatcher::builder(bot, handler).build().setup_ctrlc_handler().dispatch().await;
Dispatcher::builder(bot, handler).enable_ctrlc_handler().build().dispatch().await;
}

View file

@ -13,10 +13,7 @@
// ```
use teloxide::{
dispatching::{
dialogue::{self, InMemStorage},
UpdateHandler,
},
dispatching::{dialogue, dialogue::InMemStorage, UpdateHandler},
prelude::*,
types::{InlineKeyboardButton, InlineKeyboardMarkup},
utils::command::BotCommands,
@ -25,17 +22,14 @@ use teloxide::{
type MyDialogue = Dialogue<State, InMemStorage<State>>;
type HandlerResult = Result<(), Box<dyn std::error::Error + Send + Sync>>;
#[derive(Clone)]
#[derive(Clone, Default)]
pub enum State {
#[default]
Start,
ReceiveFullName,
ReceiveProductChoice { full_name: String },
}
impl Default for State {
fn default() -> Self {
Self::Start
}
ReceiveProductChoice {
full_name: String,
},
}
#[derive(BotCommands, Clone)]
@ -58,29 +52,30 @@ async fn main() {
Dispatcher::builder(bot, schema())
.dependencies(dptree::deps![InMemStorage::<State>::new()])
.enable_ctrlc_handler()
.build()
.setup_ctrlc_handler()
.dispatch()
.await;
}
fn schema() -> UpdateHandler<Box<dyn std::error::Error + Send + Sync + 'static>> {
use dptree::case;
let command_handler = teloxide::filter_command::<Command, _>()
.branch(
dptree::case![State::Start]
.branch(dptree::case![Command::Help].endpoint(help))
.branch(dptree::case![Command::Start].endpoint(start)),
case![State::Start]
.branch(case![Command::Help].endpoint(help))
.branch(case![Command::Start].endpoint(start)),
)
.branch(dptree::case![Command::Cancel].endpoint(cancel));
.branch(case![Command::Cancel].endpoint(cancel));
let message_handler = Update::filter_message()
.branch(command_handler)
.branch(dptree::case![State::ReceiveFullName].endpoint(receive_full_name))
.branch(case![State::ReceiveFullName].endpoint(receive_full_name))
.branch(dptree::endpoint(invalid_state));
let callback_query_handler = Update::filter_callback_query().chain(
dptree::case![State::ReceiveProductChoice { full_name }]
.endpoint(receive_product_selection),
let callback_query_handler = Update::filter_callback_query().branch(
case![State::ReceiveProductChoice { full_name }].endpoint(receive_product_selection),
);
dialogue::enter::<Update, InMemStorage<State>, State, _>()

View file

@ -1,27 +1,34 @@
// This bot answers how many messages it received in total on every message.
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use once_cell::sync::Lazy;
use teloxide::prelude::*;
static MESSAGES_TOTAL: Lazy<AtomicU64> = Lazy::new(AtomicU64::default);
#[tokio::main]
async fn main() {
pretty_env_logger::init();
log::info!("Starting shared state bot...");
let bot = Bot::from_env().auto_send();
let messages_total = Arc::new(AtomicU64::new(0));
let handler = Update::filter_message().branch(dptree::endpoint(
|msg: Message, bot: AutoSend<Bot>| async move {
let previous = MESSAGES_TOTAL.fetch_add(1, Ordering::Relaxed);
let handler = Update::filter_message().endpoint(
|msg: Message, bot: AutoSend<Bot>, messages_total: Arc<AtomicU64>| async move {
let previous = messages_total.fetch_add(1, Ordering::Relaxed);
bot.send_message(msg.chat.id, format!("I received {previous} messages in total."))
.await?;
respond(())
},
));
);
Dispatcher::builder(bot, handler).build().setup_ctrlc_handler().dispatch().await;
Dispatcher::builder(bot, handler)
// Pass the shared state to the handler as a dependency.
.dependencies(dptree::deps![messages_total])
.enable_ctrlc_handler()
.build()
.dispatch()
.await;
}

View file

@ -1,4 +1,4 @@
[toolchain]
channel = "nightly-2022-05-06"
channel = "nightly-2022-07-01"
components = ["rustfmt", "clippy"]
profile = "minimal"

View file

@ -1,99 +1,198 @@
//! An update dispatching model based on [`dptree`].
//!
//! In teloxide, updates are dispatched by a pipleine. The central type is
//! [`dptree::Handler`] -- it represents a handler of an update; since the API
//! is highly declarative, you can combine handlers with each other via such
//! methods as [`dptree::Handler::chain`] and [`dptree::Handler::branch`]. The
//! former method pipes one handler to another one, whilst the latter creates a
//! new node, as communicated by the name. For more information, please refer to
//! the documentation of [`dptree`].
//! In teloxide, update dispatching is declarative: it takes the form of a
//! [chain of responsibility] pattern enriched with a number of combinator
//! functions, which together form an instance of the [`dptree::Handler`] type.
//!
//! The pattern itself is called [chain of responsibility], a well-known design
//! technique across OOP developers. But unlike typical object-oriented design,
//! we employ declarative FP-style functions like [`dptree::filter`],
//! [`dptree::filter_map`], and [`dptree::endpoint`]; these functions create
//! special forms of [`dptree::Handler`]; for more information, please refer to
//! their respective documentation. Each of these higher-order functions accept
//! a closure that is made into a handler -- this closure can take any
//! additional parameters, which must be supplied while creating [`Dispatcher`]
//! (see [`DispatcherBuilder::dependencies`]).
//!
//! The [`Dispatcher`] type puts all these things together: it only provides
//! [`Dispatcher::dispatch`] and a handful of other methods. Once you call
//! `.dispatch()`, it will retrieve updates from the Telegram server and pass
//! them to your handler, which is a parameter of [`Dispatcher::builder`].
//!
//! Let us look at a simple example:
//!
//!
//! ([Full](https://github.com/teloxide/teloxide/blob/master/examples/shared_state.rs))
//! Take [`examples/purchase.rs`] as an example of dispatching logic. First, we
//! define a type named `State` to represent the current state of a dialogue:
//!
//! ```no_run
//! use std::sync::atomic::{AtomicU64, Ordering};
//!
//! use once_cell::sync::Lazy;
//! use teloxide::prelude::*;
//!
//! static MESSAGES_TOTAL: Lazy<AtomicU64> = Lazy::new(AtomicU64::default);
//!
//! # #[tokio::main]
//! # async fn main() {
//! pretty_env_logger::init();
//! log::info!("Starting shared state bot...");
//!
//! let bot = Bot::from_env().auto_send();
//!
//! let handler = Update::filter_message().branch(dptree::endpoint(
//! |msg: Message, bot: AutoSend<Bot>| async move {
//! let previous = MESSAGES_TOTAL.fetch_add(1, Ordering::Relaxed);
//! bot.send_message(msg.chat.id, format!("I received {} messages in total.", previous))
//! .await?;
//! respond(())
//! #[derive(Clone, Default)]
//! pub enum State {
//! #[default]
//! Start,
//! ReceiveFullName,
//! ReceiveProductChoice {
//! full_name: String,
//! },
//! ));
//!
//! Dispatcher::builder(bot, handler).build().setup_ctrlc_handler().dispatch().await;
//! # }
//! }
//! ```
//!
//! 1. First, we create the bot: `let bot = Bot::from_env().auto_send()`.
//! 2. Then we construct an update handler. While it is possible to handle all
//! kinds of [`crate::types::Update`], here we are only interested in
//! [`crate::types::Message`]: [`UpdateFilterExt::filter_message`] create a
//! handler object which filters all messages out of a generic update.
//! 3. By doing `.branch(dptree::endpoint(...))`, we set up a custom handling
//! closure that receives `msg: Message` and `bot: AutoSend<Bot>`. There are
//! called dependencies: `msg` is supplied by
//! [`UpdateFilterExt::filter_message`], while `bot` is supplied by
//! [`Dispatcher`].
//! Then, we define a type `Command` to represent user commands such as
//! `/start` or `/help`:
//!
//! That being said, if we receive a message, the dispatcher will call our
//! handler, but if we receive something other than a message (e.g., a channel
//! post), you will see an unhandled update notice in your terminal.
//! ```no_run
//! # use teloxide::utils::command::BotCommands;
//! #[derive(BotCommands, Clone)]
//! #[command(rename = "lowercase", description = "These commands are supported:")]
//! enum Command {
//! #[command(description = "display this text.")]
//! Help,
//! #[command(description = "start the purchase procedure.")]
//! Start,
//! #[command(description = "cancel the purchase procedure.")]
//! Cancel,
//! }
//! ```
//!
//! This is a very limited example of update pipelining facilities. In more
//! involved scenarios, there are multiple branches and chains; if one element
//! of a chain fails to handle an update, the update will be passed forwards; if
//! no handler succeeds at handling the update, [`Dispatcher`] will invoke a
//! default handler set up via [`DispatcherBuilder::default_handler`].
//! Now the key question: how to elegantly dispatch on different combinations of
//! `State`, `Command`, and Telegram updates? -- i.e., we may want to execute
//! specific endpoints only in response to specific user commands and while we
//! are in a given dialogue state (and possibly under other circumstances!). The
//! solution is to use [`dptree`]:
//!
//! Update pipelining provides several advantages over the typical `match
//! (update.kind) { ... }` approach:
//! ```no_run
//! # // That's a lot of context needed to compile this, oof
//! # use teloxide::dispatching::{UpdateHandler, UpdateFilterExt, dialogue, dialogue::InMemStorage};
//! # use teloxide::utils::command::BotCommands;
//! # use teloxide::types::Update;
//! # #[derive(Clone, Default)] pub enum State { #[default] Start, ReceiveFullName, ReceiveProductChoice { full_name: String } }
//! # #[derive(BotCommands, Clone)] enum Command { Help, Start, Cancel }
//! # type HandlerResult = Result<(), Box<dyn std::error::Error + Send + Sync>>;
//! # async fn help() -> HandlerResult { Ok(()) }
//! # async fn start() -> HandlerResult { Ok(()) }
//! # async fn cancel() -> HandlerResult { Ok(()) }
//! # async fn receive_full_name() -> HandlerResult { Ok(()) }
//! # async fn invalid_state() -> HandlerResult { Ok(()) }
//! # async fn receive_product_selection() -> HandlerResult { Ok(()) }
//! #
//! fn schema() -> UpdateHandler<Box<dyn std::error::Error + Send + Sync + 'static>> {
//! use dptree::case;
//!
//! 1. It supports _extension_: e.g., you
//! can define extension filters or some other handlers and then combine them in
//! a single place, thus facilitating loose coupling.
//! 2. Pipelining exhibits a natural syntax for expressing message processing.
//! 3. Lastly, it provides a primitive form of [dependency injection (DI)],
//! which allows you to deal with such objects as a bot and various update types
//! easily.
//! let command_handler = teloxide::filter_command::<Command, _>()
//! .branch(
//! case![State::Start]
//! .branch(case![Command::Help].endpoint(help))
//! .branch(case![Command::Start].endpoint(start)),
//! )
//! .branch(case![Command::Cancel].endpoint(cancel));
//!
//! For a more involved example, see [`examples/dispatching_features.rs`](https://github.com/teloxide/teloxide/blob/master/examples/dispatching_features.rs).
//! let message_handler = Update::filter_message()
//! .branch(command_handler)
//! .branch(case![State::ReceiveFullName].endpoint(receive_full_name))
//! .branch(dptree::endpoint(invalid_state));
//!
//! TODO: explain a more involved example with multiple branches.
//! let callback_query_handler = Update::filter_callback_query().branch(
//! case![State::ReceiveProductChoice { full_name }].endpoint(receive_product_selection),
//! );
//!
//! dialogue::enter::<Update, InMemStorage<State>, State, _>()
//! .branch(message_handler)
//! .branch(callback_query_handler)
//! }
//! ```
//!
//! The overall logic should be clear. Throughout the above example, we use
//! several techniques:
//!
//! - **Branching:** `a.branch(b)` roughly means "try to handle an update with
//! `a`, then, if it
//! neglects the update, try `b`".
//! - **Pattern matching:** We also use the [`dptree::case!`] macro
//! extensively, which acts as a filter on an enumeration: if it is of a
//! certain variant, it passes the variant's payload down the handler chain;
//! otherwise, it neglects an update.
//! - **Endpoints:** To specify the final function to handle an update, we use
//! [`dptree::Handler::endpoint`].
//!
//! Notice the clear and uniform code structure: regardless of the dispatch
//! criteria, we use the same program constructions. In future, you may want to
//! introduce your application-specific filters or data structures to match upon
//! -- no problem, reuse [`dptree::Handler::filter`], [`dptree::case!`], and
//! other combinators in the same way!
//!
//! Finally, we define our endpoints like this:
//!
//! ```no_run
//! # use teloxide::{Bot, adaptors::AutoSend};
//! # use teloxide::types::{Message, CallbackQuery};
//! # use teloxide::dispatching::dialogue::{InMemStorage, Dialogue};
//! # enum State{}
//! #
//! type MyDialogue = Dialogue<State, InMemStorage<State>>;
//! type HandlerResult = Result<(), Box<dyn std::error::Error + Send + Sync>>;
//!
//! async fn start(bot: AutoSend<Bot>, msg: Message, dialogue: MyDialogue) -> HandlerResult {
//! todo!()
//! }
//!
//! async fn help(bot: AutoSend<Bot>, msg: Message) -> HandlerResult {
//! todo!()
//! }
//!
//! async fn cancel(bot: AutoSend<Bot>, msg: Message, dialogue: MyDialogue) -> HandlerResult {
//! todo!()
//! }
//!
//! async fn invalid_state(bot: AutoSend<Bot>, msg: Message) -> HandlerResult {
//! todo!()
//! }
//!
//! async fn receive_full_name(
//! bot: AutoSend<Bot>,
//! msg: Message,
//! dialogue: MyDialogue,
//! ) -> HandlerResult {
//! todo!()
//! }
//!
//! async fn receive_product_selection(
//! bot: AutoSend<Bot>,
//! q: CallbackQuery,
//! dialogue: MyDialogue,
//! full_name: String,
//! ) -> HandlerResult {
//! todo!()
//! }
//! ```
//!
//! Each parameter is supplied as a dependency by teloxide. In particular:
//! - `bot: AutoSend<Bot>` comes from the dispatcher (see below);
//! - `msg: Message` comes from [`Update::filter_message`];
//! - `q: CallbackQuery` comes from [`Update::filter_callback_query`];
//! - `dialogue: MyDialogue` comes from [`dialogue::enter`];
//! - `full_name: String` comes from `dptree::case![State::ReceiveProductChoice
//! { full_name }]`.
//!
//! Inside `main`, we plug the schema into [`Dispatcher`] like this:
//!
//! ```no_run
//! # use teloxide::Bot;
//! # use teloxide::requests::RequesterExt;
//! # use teloxide::dispatching::{Dispatcher, dialogue::InMemStorage};
//! # enum State {}
//! # fn schema() -> teloxide::dispatching::UpdateHandler<Box<dyn std::error::Error + Send + Sync + 'static>> { teloxide::dptree::entry() }
//! #[tokio::main]
//! async fn main() {
//! let bot = Bot::from_env().auto_send();
//!
//! Dispatcher::builder(bot, schema())
//! .dependencies(dptree::deps![InMemStorage::<State>::new()])
//! .enable_ctrlc_handler()
//! .build()
//! .dispatch()
//! .await;
//! }
//! ```
//!
//! In a call to [`DispatcherBuilder::dependencies`], we specify a list of
//! additional dependencies that all handlers will receive as parameters. Here,
//! we only specify an in-memory storage of dialogues needed for
//! [`dialogue::enter`]. However, in production bots, you normally also pass a
//! database connection, configuration, and other stuff.
//!
//! All in all, [`dptree`] can be seen as an extensible alternative to pattern
//! matching, with support for [dependency injection (DI)] and a few other
//! useful features. See [`examples/dispatching_features.rs`] as a more involved
//! example.
//!
//! [`examples/purchase.rs`]: https://github.com/teloxide/teloxide/blob/master/examples/purchase.rs
//! [`Update::filter_message`]: crate::types::Update::filter_message
//! [`Update::filter_callback_query`]: crate::types::Update::filter_callback_query
//! [chain of responsibility]: https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern
//! [dependency injection (DI)]: https://en.wikipedia.org/wiki/Dependency_injection
//! [`examples/dispatching_features.rs`]: https://github.com/teloxide/teloxide/blob/master/examples/dispatching_features.rs
#[cfg(all(feature = "ctrlc_handler"))]
pub mod repls;

View file

@ -4,17 +4,19 @@
//! wrapper over [`Storage`] and a chat ID. All it does is provides convenient
//! method for manipulating the dialogue state. [`Storage`] is where all
//! dialogue states are stored; it can be either [`InMemStorage`], which is a
//! simple hash map, or database wrappers such as [`SqliteStorage`]. In the
//! latter case, your dialogues are _persistent_, meaning that you can safely
//! restart your bot and all dialogues will remain in the database -- this is a
//! preferred method for production bots.
//! simple hash map from [`std::collections`], or an advanced database wrapper
//! such as [`SqliteStorage`]. In the latter case, your dialogues are
//! _persistent_, meaning that you can safely restart your bot and all ongoing
//! dialogues will remain in the database -- this is a preferred method for
//! production bots.
//!
//! [`examples/dialogue.rs`] clearly demonstrates the typical usage of
//! dialogues. Your dialogue state can be represented as an enumeration:
//!
//! ```ignore
//! #[derive(Clone)]
//! #[derive(Clone, Default)]
//! pub enum State {
//! #[default]
//! Start,
//! ReceiveFullName,
//! ReceiveAge { full_name: String },
@ -30,8 +32,8 @@
//! bot: AutoSend<Bot>,
//! msg: Message,
//! dialogue: MyDialogue,
//! (full_name,): (String,), // Available from `State::ReceiveAge`.
//! ) -> anyhow::Result<()> {
//! full_name: String, // Available from `State::ReceiveAge`.
//! ) -> HandlerResult {
//! match msg.text().map(|text| text.parse::<u8>()) {
//! Some(Ok(age)) => {
//! bot.send_message(msg.chat.id, "What's your location?").await?;
@ -46,11 +48,12 @@
//! }
//! ```
//!
//! Variant's fields are passed to state handlers as tuples: `(full_name,):
//! (String,)`. Using [`Dialogue::update`], you can update the dialogue with a
//! new state, in our case -- `State::ReceiveLocation { full_name, age }`. To
//! exit the dialogue, just call [`Dialogue::exit`] and it will be removed from
//! the inner storage:
//! Variant's fields are passed to state handlers as single arguments like
//! `full_name: String` or tuples in case of two or more variant parameters (see
//! below). Using [`Dialogue::update`], you can update the dialogue with a new
//! state, in our case -- `State::ReceiveLocation { full_name, age }`. To exit
//! the dialogue, just call [`Dialogue::exit`] and it will be removed from the
//! underlying storage:
//!
//! ```ignore
//! async fn receive_location(

View file

@ -11,17 +11,20 @@ use crate::{
use dptree::di::{DependencyMap, DependencySupplier};
use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use std::{
collections::HashMap,
fmt::Debug,
hash::Hash,
ops::{ControlFlow, Deref},
sync::Arc,
};
use tokio::time::timeout;
use tokio_stream::wrappers::ReceiverStream;
use std::future::Future;
use std::{
collections::HashMap,
fmt::Debug,
future::Future,
hash::Hash,
ops::{ControlFlow, Deref},
sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
Arc,
},
};
/// The builder for [`Dispatcher`].
pub struct DispatcherBuilder<R, Err, Key> {
@ -30,6 +33,7 @@ pub struct DispatcherBuilder<R, Err, Key> {
handler: Arc<UpdateHandler<Err>>,
default_handler: DefaultHandler,
error_handler: Arc<dyn ErrorHandler<Err> + Send + Sync>,
ctrlc_handler: bool,
distribution_f: fn(&Update) -> Option<Key>,
worker_queue_size: usize,
}
@ -75,6 +79,14 @@ where
Self { dependencies, ..self }
}
/// Enables the `^C` handler that [`shutdown`]s dispatching.
///
/// [`shutdown`]: ShutdownToken::shutdown
#[cfg(feature = "ctrlc_handler")]
pub fn enable_ctrlc_handler(self) -> Self {
Self { ctrlc_handler: true, ..self }
}
/// Specifies size of the queue for workers.
///
/// By default it's 64.
@ -98,6 +110,7 @@ where
handler,
default_handler,
error_handler,
ctrlc_handler,
distribution_f: _,
worker_queue_size,
} = self;
@ -108,6 +121,7 @@ where
handler,
default_handler,
error_handler,
ctrlc_handler,
distribution_f: f,
worker_queue_size,
}
@ -124,9 +138,10 @@ where
error_handler,
distribution_f,
worker_queue_size,
ctrlc_handler,
} = self;
Dispatcher {
let dp = Dispatcher {
bot,
dependencies,
handler,
@ -137,7 +152,20 @@ where
worker_queue_size,
workers: HashMap::new(),
default_worker: None,
current_number_of_active_workers: Default::default(),
max_number_of_active_workers: Default::default(),
};
#[cfg(feature = "ctrlc_handler")]
{
if ctrlc_handler {
let mut dp = dp;
dp.setup_ctrlc_handler_inner();
return dp;
}
}
dp
}
}
@ -158,6 +186,8 @@ pub struct Dispatcher<R, Err, Key> {
distribution_f: fn(&Update) -> Option<Key>,
worker_queue_size: usize,
current_number_of_active_workers: Arc<AtomicU32>,
max_number_of_active_workers: Arc<AtomicU32>,
// Tokio TX channel parts associated with chat IDs that consume updates sequentially.
workers: HashMap<Key, Worker>,
// The default TX part that consume updates concurrently.
@ -171,6 +201,7 @@ pub struct Dispatcher<R, Err, Key> {
struct Worker {
tx: tokio::sync::mpsc::Sender<Update>,
handle: tokio::task::JoinHandle<()>,
is_waiting: Arc<AtomicBool>,
}
// TODO: it is allowed to return message as response on telegram request in
@ -204,6 +235,7 @@ where
Box::pin(async {})
}),
error_handler: LoggingErrorHandler::new(),
ctrlc_handler: false,
worker_queue_size: DEFAULT_WORKER_QUEUE_SIZE,
distribution_f: default_distribution_function,
}
@ -214,7 +246,7 @@ impl<R, Err, Key> Dispatcher<R, Err, Key>
where
R: Requester + Clone + Send + Sync + 'static,
Err: Send + Sync + 'static,
Key: Hash + Eq,
Key: Hash + Eq + Clone,
{
/// Starts your bot with the default parameters.
///
@ -230,7 +262,6 @@ where
/// - [`crate::types::Me`] (can be used in [`HandlerExt::filter_command`]).
///
/// [`shutdown`]: ShutdownToken::shutdown
/// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler
/// [`HandlerExt::filter_command`]: crate::dispatching::HandlerExt::filter_command
pub async fn dispatch(&mut self)
where
@ -250,7 +281,6 @@ where
/// This method adds the same dependencies as [`Dispatcher::dispatch`].
///
/// [`shutdown`]: ShutdownToken::shutdown
/// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler
pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>(
&'a mut self,
mut update_listener: UListener,
@ -280,6 +310,8 @@ where
tokio::pin!(stream);
loop {
self.remove_inactive_workers_if_needed().await;
// False positive
#[allow(clippy::collapsible_match)]
if let Ok(upd) = timeout(shutdown_check_timeout, stream.next()).await {
@ -342,6 +374,8 @@ where
handler,
default_handler,
error_handler,
Arc::clone(&self.current_number_of_active_workers),
Arc::clone(&self.max_number_of_active_workers),
self.worker_queue_size,
)
}),
@ -367,11 +401,68 @@ where
}
}
async fn remove_inactive_workers_if_needed(&mut self) {
let workers = self.workers.len();
let max = self.max_number_of_active_workers.load(Ordering::Relaxed) as usize;
if workers <= max {
return;
}
self.remove_inactive_workers().await;
}
#[inline(never)] // Cold function.
async fn remove_inactive_workers(&mut self) {
let handles = self
.workers
.iter()
.filter(|(_, worker)| {
worker.tx.capacity() == self.worker_queue_size
&& worker.is_waiting.load(Ordering::Relaxed)
})
.map(|(k, _)| k)
.cloned()
.collect::<Vec<_>>()
.into_iter()
.map(|key| {
let Worker { tx, handle, .. } = self.workers.remove(&key).unwrap();
// Close channel, worker should stop almost immediately
// (it's been supposedly waiting on the channel)
drop(tx);
handle
});
for handle in handles {
// We must wait for worker to stop anyway, even though it should stop
// immediately. This helps in case if we've checked that the worker
// is waiting in between it received the update and set the flag.
let _ = handle.await;
}
}
/// Setups the `^C` handler that [`shutdown`]s dispatching.
///
/// [`shutdown`]: ShutdownToken::shutdown
#[cfg(feature = "ctrlc_handler")]
#[deprecated(since = "0.10.0", note = "use `enable_ctrlc_handler` on builder instead")]
pub fn setup_ctrlc_handler(&mut self) -> &mut Self {
self.setup_ctrlc_handler_inner();
self
}
/// Returns a shutdown token, which can later be used to shutdown
/// dispatching.
pub fn shutdown_token(&self) -> ShutdownToken {
self.state.clone()
}
}
impl<R, Err, Key> Dispatcher<R, Err, Key> {
#[cfg(feature = "ctrlc_handler")]
fn setup_ctrlc_handler_inner(&mut self) {
let token = self.state.clone();
tokio::spawn(async move {
loop {
@ -389,14 +480,6 @@ where
}
}
});
self
}
/// Returns a shutdown token, which can later be used to shutdown
/// dispatching.
pub fn shutdown_token(&self) -> ShutdownToken {
self.state.clone()
}
}
@ -405,25 +488,40 @@ fn spawn_worker<Err>(
handler: Arc<UpdateHandler<Err>>,
default_handler: DefaultHandler,
error_handler: Arc<dyn ErrorHandler<Err> + Send + Sync>,
current_number_of_active_workers: Arc<AtomicU32>,
max_number_of_active_workers: Arc<AtomicU32>,
queue_size: usize,
) -> Worker
where
Err: Send + Sync + 'static,
{
let (tx, rx) = tokio::sync::mpsc::channel(queue_size);
let (tx, mut rx) = tokio::sync::mpsc::channel(queue_size);
let is_waiting = Arc::new(AtomicBool::new(true));
let is_waiting_local = Arc::clone(&is_waiting);
let deps = Arc::new(deps);
let handle = tokio::spawn(ReceiverStream::new(rx).for_each(move |update| {
let deps = Arc::clone(&deps);
let handler = Arc::clone(&handler);
let default_handler = Arc::clone(&default_handler);
let error_handler = Arc::clone(&error_handler);
let handle = tokio::spawn(async move {
while let Some(update) = rx.recv().await {
is_waiting_local.store(false, Ordering::Relaxed);
{
let current = current_number_of_active_workers.fetch_add(1, Ordering::Relaxed) + 1;
max_number_of_active_workers.fetch_max(current, Ordering::Relaxed);
}
handle_update(update, deps, handler, default_handler, error_handler)
}));
let deps = Arc::clone(&deps);
let handler = Arc::clone(&handler);
let default_handler = Arc::clone(&default_handler);
let error_handler = Arc::clone(&error_handler);
Worker { tx, handle }
handle_update(update, deps, handler, default_handler, error_handler).await;
current_number_of_active_workers.fetch_sub(1, Ordering::Relaxed);
is_waiting_local.store(true, Ordering::Relaxed);
}
});
Worker { tx, handle, is_waiting }
}
fn spawn_default_worker<Err>(
@ -449,7 +547,7 @@ where
handle_update(update, deps, handler, default_handler, error_handler)
}));
Worker { tx, handle }
Worker { tx, handle, is_waiting: Arc::new(AtomicBool::new(true)) }
}
async fn handle_update<Err>(

View file

@ -1,7 +1,7 @@
use teloxide_core::types::{ChatId, Update};
/// Default distribution key for dispatching.
#[derive(Debug, Hash, PartialEq, Eq)]
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
pub struct DefaultKey(ChatId);
pub(crate) fn default_distribution_function(update: &Update) -> Option<DefaultKey> {

View file

@ -1,44 +1,27 @@
use std::collections::HashSet;
use dptree::{description::EventKind, HandlerDescription};
use dptree::{
description::{EventKind, InterestSet},
HandlerDescription,
};
use teloxide_core::types::AllowedUpdate;
/// Handler description that is used by [`Dispatcher`].
///
/// [`Dispatcher`]: crate::dispatching::Dispatcher
pub struct DpHandlerDescription {
allowed: EventKind<AllowedUpdate>,
allowed: InterestSet<Kind>,
}
impl DpHandlerDescription {
pub(crate) fn of(allowed: AllowedUpdate) -> Self {
let mut set = HashSet::with_capacity(1);
set.insert(allowed);
Self { allowed: EventKind::InterestList(set) }
set.insert(Kind(allowed));
Self { allowed: InterestSet::new_filter(set) }
}
pub(crate) fn allowed_updates(&self) -> Vec<AllowedUpdate> {
use AllowedUpdate::*;
match &self.allowed {
EventKind::InterestList(set) => set.iter().copied().collect(),
EventKind::Entry => panic!("No updates were allowed"),
EventKind::UserDefined => vec![
Message,
EditedMessage,
ChannelPost,
EditedChannelPost,
InlineQuery,
ChosenInlineResult,
CallbackQuery,
ShippingQuery,
PreCheckoutQuery,
Poll,
PollAnswer,
MyChatMember,
ChatMember,
],
}
self.allowed.observed.iter().map(|Kind(x)| x).copied().collect()
}
}
@ -59,3 +42,70 @@ impl HandlerDescription for DpHandlerDescription {
Self { allowed: self.allowed.merge_branch(&other.allowed) }
}
}
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
struct Kind(AllowedUpdate);
impl EventKind for Kind {
fn full_set() -> HashSet<Self> {
use AllowedUpdate::*;
[
Message,
EditedMessage,
ChannelPost,
EditedChannelPost,
InlineQuery,
ChosenInlineResult,
CallbackQuery,
ShippingQuery,
PreCheckoutQuery,
Poll,
PollAnswer,
MyChatMember,
ChatMember,
]
.into_iter()
.map(Kind)
.collect()
}
fn empty_set() -> HashSet<Self> {
HashSet::new()
}
}
#[cfg(test)]
mod tests {
use crate::{
dispatching::{HandlerExt, UpdateFilterExt},
types::{AllowedUpdate::*, Update},
utils::command::BotCommands,
};
use crate as teloxide; // fixup for the `BotCommands` macro
#[derive(BotCommands, Clone)]
#[command(rename = "lowercase")]
enum Cmd {
B,
}
// <https://github.com/teloxide/teloxide/discussions/648>
#[test]
fn discussion_648() {
let h =
dptree::entry().branch(Update::filter_my_chat_member().endpoint(|| async {})).branch(
Update::filter_message()
.branch(dptree::entry().filter_command::<Cmd>().endpoint(|| async {}))
.endpoint(|| async {}),
);
let mut v = h.description().allowed_updates();
// Hash set randomizes element order, so to compare we need to sort
v.sort_by_key(|&a| a as u8);
assert_eq!(v, [Message, MyChatMember])
}
}

View file

@ -14,7 +14,12 @@ use teloxide_core::requests::Requester;
///
/// All errors from an update listener and handler will be logged.
///
/// REPLs are meant only for simple bots and rapid prototyping. If you need to
/// supply dependencies or describe more complex dispatch logic, please use
/// [`Dispatcher`].
///
/// ## Caution
///
/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs,
/// because Telegram disallow multiple requests at the same time from the same
/// bot.
@ -49,7 +54,12 @@ where
///
/// All errors from an update listener and handler will be logged.
///
/// REPLs are meant only for simple bots and rapid prototyping. If you need to
/// supply dependencies or describe more complex dispatch logic, please use
/// [`Dispatcher`].
///
/// ## Caution
///
/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs,
/// because Telegram disallow multiple requests at the same time from the same
/// bot.
@ -86,8 +96,8 @@ pub async fn commands_repl_with_listener<'a, R, Cmd, H, L, ListenerE, E, Args>(
Update::filter_message().filter_command::<Cmd>().chain(dptree::endpoint(handler)),
)
.default_handler(ignore_update)
.enable_ctrlc_handler()
.build()
.setup_ctrlc_handler()
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),

View file

@ -11,7 +11,12 @@ use teloxide_core::requests::Requester;
///
/// All errors from an update listener and a handler will be logged.
///
/// # Caution
/// REPLs are meant only for simple bots and rapid prototyping. If you need to
/// supply dependencies or describe more complex dispatch logic, please use
/// [`Dispatcher`].
///
/// ## Caution
///
/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs,
/// because Telegram disallow multiple requests at the same time from the same
/// bot.
@ -35,7 +40,12 @@ where
///
/// All errors from an update listener and handler will be logged.
///
/// REPLs are meant only for simple bots and rapid prototyping. If you need to
/// supply dependencies or describe more complex dispatch logic, please use
/// [`Dispatcher`].
///
/// # Caution
///
/// **DO NOT** use this function together with [`Dispatcher`] and other REPLs,
/// because Telegram disallow multiple requests at the same time from the same
/// bot.
@ -61,8 +71,8 @@ where
Dispatcher::builder(bot, Update::filter_message().chain(dptree::endpoint(handler)))
.default_handler(ignore_update)
.enable_ctrlc_handler()
.build()
.setup_ctrlc_handler()
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),

View file

@ -27,7 +27,7 @@
/// Implementations of webhook update listeners - an alternative (to
/// [`fn@polling`]) way of receiving updates from telegram.
#[cfg(any(feature = "webhooks-axum"))]
#[cfg(feature = "webhooks")]
pub mod webhooks;
use futures::Stream;
@ -42,8 +42,9 @@ use crate::{
mod polling;
mod stateful_listener;
#[allow(deprecated)]
pub use self::{
polling::{polling, polling_default},
polling::{polling, polling_default, Polling, PollingBuilder, PollingStream},
stateful_listener::StatefulListener,
};
@ -125,3 +126,11 @@ pub trait AsUpdateStream<'a, E> {
/// [`Stream`]: AsUpdateStream::Stream
fn as_stream(&'a mut self) -> Self::Stream;
}
#[inline(always)]
pub(crate) fn assert_update_listener<L, E>(listener: L) -> L
where
L: UpdateListener<E>,
{
listener
}

View file

@ -1,89 +1,200 @@
use std::{convert::TryInto, time::Duration};
use futures::{
future::{ready, Either},
stream::{self, Stream, StreamExt},
use std::{
convert::TryInto,
future::Future,
pin::Pin,
task::{
self,
Poll::{self, Ready},
},
time::Duration,
vec,
};
use futures::{ready, stream::Stream};
use crate::{
dispatching::{
stop_token::{AsyncStopFlag, AsyncStopToken},
update_listeners::{stateful_listener::StatefulListener, UpdateListener},
update_listeners::{assert_update_listener, AsUpdateStream, UpdateListener},
},
payloads::{GetUpdates, GetUpdatesSetters as _},
requests::{HasPayload, Request, Requester},
types::{AllowedUpdate, Update},
};
/// Returns a long polling update listener with `timeout` of 10 seconds.
/// Builder for polling update listener.
///
/// See also: [`polling`](polling).
///
/// ## Notes
///
/// This function will automatically delete a webhook if it was set up.
pub async fn polling_default<R>(requester: R) -> impl UpdateListener<R::Err>
/// Can be created by [`Polling::builder`].
#[non_exhaustive]
#[must_use = "`PollingBuilder` is a builder and does nothing unless used"]
pub struct PollingBuilder<R> {
pub bot: R,
pub timeout: Option<Duration>,
pub limit: Option<u8>,
pub allowed_updates: Option<Vec<AllowedUpdate>>,
pub drop_pending_updates: bool,
}
impl<R> PollingBuilder<R>
where
R: Requester + Send + 'static,
<R as Requester>::GetUpdates: Send,
{
delete_webhook_if_setup(&requester).await;
polling(requester, Some(Duration::from_secs(10)), None, None)
/// A timeout in seconds for polling.
///
/// ## Note
///
/// `timeout` should not be bigger than http client timeout, see
/// [`default_reqwest_settings`] for default http client settings.
///
/// [`default_reqwest_settings`]: crate::net::default_reqwest_settings
pub fn timeout(self, timeout: Duration) -> Self {
Self { timeout: Some(timeout), ..self }
}
/// Limit the number of updates to be retrieved at once. Values between
/// 1—100 are accepted.
///
/// ## Panics
///
/// If `limit` is 0 or greater than 100.
#[track_caller]
pub fn limit(self, limit: u8) -> Self {
assert_ne!(limit, 0, "limit can't be 0");
assert!(limit <= 100, "maximum limit is 100, can't set limit to `{limit}`");
Self { limit: Some(limit), ..self }
}
/// A list of the types of updates you want to receive.
///
/// ## Note
///
/// Teloxide normally (when using [`Dispatcher`] or [`repl`]s) sets this
/// automatically via [`hint_allowed_updates`], so you rarely need to use
/// `allowed_updates` explicitly.
///
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`repl`]: fn@crate::repl
/// [`hint_allowed_updates`]: crate::dispatching::update_listeners::UpdateListener::hint_allowed_updates
pub fn allowed_updates(self, allowed_updates: Vec<AllowedUpdate>) -> Self {
Self { allowed_updates: Some(allowed_updates), ..self }
}
/// Drops pending updates.
pub fn drop_pending_updates(self) -> Self {
Self { drop_pending_updates: true, ..self }
}
/// Deletes webhook if it was set up.
pub async fn delete_webhook(self) -> Self {
delete_webhook_if_setup(&self.bot).await;
self
}
/// Returns a long polling update listener with configuration from the
/// builder.
///
/// See also: [`polling_default`], [`Polling`].
pub fn build(self) -> Polling<R> {
let Self { bot, timeout, limit, allowed_updates, drop_pending_updates } = self;
let (token, flag) = AsyncStopToken::new_pair();
let polling =
Polling { bot, timeout, limit, allowed_updates, drop_pending_updates, flag, token };
assert_update_listener(polling)
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]
/// Returns a long polling update listener with some additional options.
/// Returns a long polling update listener with `timeout` of 10 seconds.
///
/// - `bot`: Using this bot, the returned update listener will receive updates.
/// - `timeout`: A timeout in seconds for polling.
/// - `limit`: Limits the number of updates to be retrieved at once. Values
/// between 1—100 are accepted.
/// - `allowed_updates`: A list the types of updates you want to receive.
///
/// See [`GetUpdates`] for defaults.
///
/// See also: [`polling_default`](polling_default).
/// See also: [`Polling::builder`].
///
/// ## Notes
///
/// - `timeout` should not be bigger than http client timeout, see
/// [`default_reqwest_settings`] for default http client settings.
/// - [`repl`]s and [`Dispatcher`] use [`hint_allowed_updates`] to set
/// `allowed_updates`, so you rarely need to pass `allowed_updates`
/// explicitly.
///
/// [`default_reqwest_settings`]: teloxide::net::default_reqwest_settings
/// [`repl`]: fn@crate::repl
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`hint_allowed_updates`]:
/// crate::dispatching::update_listeners::UpdateListener::hint_allowed_updates
/// This function will automatically delete a webhook if it was set up.
pub async fn polling_default<R>(bot: R) -> Polling<R>
where
R: Requester + Send + 'static,
<R as Requester>::GetUpdates: Send,
{
let polling =
Polling::builder(bot).timeout(Duration::from_secs(10)).delete_webhook().await.build();
assert_update_listener(polling)
}
/// Returns a long polling update listener with some additional options.
#[deprecated(since = "0.10.0", note = "use `Polling::builder()` instead")]
pub fn polling<R>(
bot: R,
timeout: Option<Duration>,
limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>,
) -> Polling<R>
where
R: Requester + Send + 'static,
<R as Requester>::GetUpdates: Send,
{
let mut builder = Polling::builder(bot);
builder.timeout = timeout;
builder.limit = limit;
builder.allowed_updates = allowed_updates;
assert_update_listener(builder.build())
}
async fn delete_webhook_if_setup<R>(requester: &R)
where
R: Requester,
{
let webhook_info = match requester.get_webhook_info().send().await {
Ok(ok) => ok,
Err(e) => {
log::error!("Failed to get webhook info: {:?}", e);
return;
}
};
let is_webhook_setup = webhook_info.url.is_some();
if is_webhook_setup {
if let Err(e) = requester.delete_webhook().send().await {
log::error!("Failed to delete a webhook: {:?}", e);
}
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]
/// A polling update listener.
///
/// ## How it works
///
/// Long polling works by repeatedly calling [`Bot::get_updates`][get_updates].
/// If telegram has any updates, it returns them immediately, otherwise it waits
/// until either it has any updates or `timeout` expires.
/// Long polling works by repeatedly calling
/// [`Bot::get_updates`][get_updates]. If telegram has any updates, it
/// returns them immediately, otherwise it waits until either it has any
/// updates or `timeout` expires.
///
/// Each [`get_updates`][get_updates] call includes an `offset` parameter equal
/// to the latest update id + one, that allows to only receive updates that has
/// not been received before.
/// Each [`get_updates`][get_updates] call includes an `offset` parameter
/// equal to the latest update id + one, that allows to only receive
/// updates that has not been received before.
///
/// When telegram receives a [`get_updates`][get_updates] request with `offset =
/// N` it forgets any updates with id < `N`. When `polling` listener is stopped,
/// it sends [`get_updates`][get_updates] with `timeout = 0, limit = 1` and
/// appropriate `offset`, so future bot restarts won't see updates that were
/// already seen.
/// When telegram receives a [`get_updates`][get_updates] request with
/// `offset = N` it forgets any updates with id < `N`. When `polling`
/// listener is stopped, it sends [`get_updates`][get_updates] with
/// `timeout = 0, limit = 1` and appropriate `offset`, so future bot
/// restarts won't see updates that were already seen.
///
/// Consumers of a `polling` update listener then need to repeatedly call
/// Consumers of a [`Polling`] update listener then need to repeatedly call
/// [`futures::StreamExt::next`] to get the updates.
///
/// Here is an example diagram that shows these interactions between consumers
/// like [`Dispatcher`], `polling` update listener and telegram.
/// Here is an example diagram that shows these interactions between
/// consumers like [`Dispatcher`], [`Polling`] update listener and
/// telegram.
///
/// ```mermaid
/// sequenceDiagram
/// participant C as Consumer
/// participant P as polling
/// participant P as Polling
/// participant T as Telegram
///
/// link C: Dispatcher @ ../struct.Dispatcher.html
@ -123,131 +234,180 @@ where
/// ```
///
/// [get_updates]: crate::requests::Requester::get_updates
pub fn polling<R>(
bot: R,
/// [`Dispatcher`]: crate::dispatching::Dispatcher
#[must_use = "`Polling` is an update listener and does nothing unless used"]
pub struct Polling<B: Requester> {
bot: B,
timeout: Option<Duration>,
limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>,
) -> impl UpdateListener<R::Err>
drop_pending_updates: bool,
flag: AsyncStopFlag,
token: AsyncStopToken,
}
impl<R> Polling<R>
where
R: Requester + Send + 'static,
<R as Requester>::GetUpdates: Send,
{
struct State<B: Requester> {
bot: B,
timeout: Option<u32>,
limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>,
offset: i32,
flag: AsyncStopFlag,
token: AsyncStopToken,
force_stop: bool,
/// Returns a builder for polling update listener.
pub fn builder(bot: R) -> PollingBuilder<R> {
PollingBuilder {
bot,
timeout: None,
limit: None,
allowed_updates: None,
drop_pending_updates: false,
}
}
fn stream<B>(st: &mut State<B>) -> impl Stream<Item = Result<Update, B::Err>> + Send + '_
where
B: Requester + Send,
<B as Requester>::GetUpdates: Send,
{
stream::unfold(st, move |state| async move {
let State { timeout, limit, allowed_updates, bot, offset, flag, force_stop, .. } =
&mut *state;
if *force_stop {
return None;
}
if flag.is_stopped() {
let mut req = bot.get_updates().offset(*offset).timeout(0).limit(1);
req.payload_mut().allowed_updates = allowed_updates.take();
return match req.send().await {
Ok(_) => None,
Err(err) => {
// Prevents infinite retries, see https://github.com/teloxide/teloxide/issues/496
*force_stop = true;
Some((Either::Left(stream::once(ready(Err(err)))), state))
}
};
}
let mut req = bot.get_updates();
*req.payload_mut() = GetUpdates {
offset: Some(*offset),
timeout: *timeout,
limit: *limit,
allowed_updates: allowed_updates.take(),
};
match req.send().await {
Ok(updates) => {
// Set offset to the last update's id + 1
if let Some(upd) = updates.last() {
*offset = upd.id + 1;
}
let updates = updates.into_iter().map(Ok);
Some((Either::Right(stream::iter(updates)), state))
}
Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)),
}
})
.flatten()
}
let (token, flag) = AsyncStopToken::new_pair();
let state = State {
bot,
timeout: timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")),
limit,
allowed_updates,
offset: 0,
flag,
token,
force_stop: false,
};
let stop_token = |st: &mut State<_>| st.token.clone();
let hint_allowed_updates =
Some(|state: &mut State<_>, allowed: &mut dyn Iterator<Item = AllowedUpdate>| {
// TODO: we should probably warn if there already were different allowed updates
// before
state.allowed_updates = Some(allowed.collect());
});
let timeout_hint = Some(move |_: &State<_>| timeout);
StatefulListener::new_with_hints(state, stream, stop_token, hint_allowed_updates, timeout_hint)
}
async fn delete_webhook_if_setup<R>(requester: &R)
where
R: Requester,
{
let webhook_info = match requester.get_webhook_info().send().await {
Ok(ok) => ok,
Err(e) => {
log::error!("Failed to get webhook info: {:?}", e);
return;
}
};
#[pin_project::pin_project]
pub struct PollingStream<'a, B: Requester> {
/// Parent structure
polling: &'a mut Polling<B>,
let is_webhook_setup = webhook_info.url.is_some();
/// Whatever to drop pending updates or not.
drop_pending_updates: bool,
if is_webhook_setup {
if let Err(e) = requester.delete_webhook().send().await {
log::error!("Failed to delete a webhook: {:?}", e);
/// Timeout parameter for normal `get_updates()` calls.
timeout: Option<u32>,
/// Allowed updates parameter for the first `get_updates()` call.
allowed_updates: Option<Vec<AllowedUpdate>>,
/// Offset parameter for normal `get_updates()` calls.
offset: i32,
/// If this is set, return `None` from `poll_next` immediately.
force_stop: bool,
/// If true we've sent last `get_updates()` call for graceful shutdown.
stopping: bool,
/// Buffer of updates to be yielded.
buffer: vec::IntoIter<Update>,
/// In-flight `get_updates()` call.
#[pin]
in_flight: Option<<B::GetUpdates as Request>::Send>,
}
impl<B: Requester + Send + 'static> UpdateListener<B::Err> for Polling<B> {
type StopToken = AsyncStopToken;
fn stop_token(&mut self) -> Self::StopToken {
self.token.clone()
}
fn hint_allowed_updates(&mut self, hint: &mut dyn Iterator<Item = AllowedUpdate>) {
// TODO: we should probably warn if there already were different allowed updates
// before
self.allowed_updates = Some(hint.collect());
}
fn timeout_hint(&self) -> Option<Duration> {
self.timeout
}
}
impl<'a, B: Requester + Send + 'a> AsUpdateStream<'a, B::Err> for Polling<B> {
type Stream = PollingStream<'a, B>;
fn as_stream(&'a mut self) -> Self::Stream {
let timeout = self.timeout.map(|t| t.as_secs().try_into().expect("timeout is too big"));
let allowed_updates = self.allowed_updates.clone();
let drop_pending_updates = self.drop_pending_updates;
PollingStream {
polling: self,
drop_pending_updates,
timeout,
allowed_updates,
offset: 0,
force_stop: false,
stopping: false,
buffer: Vec::new().into_iter(),
in_flight: None,
}
}
}
impl<B: Requester> Stream for PollingStream<'_, B> {
type Item = Result<Update, B::Err>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
if *this.force_stop {
return Ready(None);
}
// Poll in-flight future until completion
if let Some(in_flight) = this.in_flight.as_mut().as_pin_mut() {
let res = ready!(in_flight.poll(cx));
this.in_flight.set(None);
match res {
Ok(_) if *this.stopping => return Ready(None),
Err(err) if *this.stopping => {
// Prevents infinite retries, see https://github.com/teloxide/teloxide/issues/496
*this.force_stop = true;
return Ready(Some(Err(err)));
}
Ok(updates) => {
if let Some(upd) = updates.last() {
*this.offset = upd.id + 1;
}
match *this.drop_pending_updates {
false => *this.buffer = updates.into_iter(),
true => *this.drop_pending_updates = false,
}
}
Err(err) => return Ready(Some(Err(err))),
}
}
// If there are any buffered updates, return one
if let Some(upd) = this.buffer.next() {
return Ready(Some(Ok(upd)));
}
*this.stopping = this.polling.flag.is_stopped();
let (offset, limit, timeout) = match (this.stopping, this.drop_pending_updates) {
// Normal `get_updates()` call
(false, false) => (*this.offset, this.polling.limit, *this.timeout),
// Graceful shutdown `get_updates()` call (shutdown takes priority over dropping pending
// updates)
//
// When stopping we set `timeout = 0` and `limit = 1` so that `get_updates()`
// set last seen update (offset) and return immediately
(true, _) => (*this.offset, Some(1), Some(0)),
// Drop pending updates
(_, true) => (-1, Some(1), Some(0)),
};
let req = this
.polling
.bot
.get_updates()
.with_payload_mut(|pay| {
pay.offset = Some(offset);
pay.timeout = timeout;
pay.limit = limit;
pay.allowed_updates = this.allowed_updates.take();
})
.send();
this.in_flight.set(Some(req));
// Recurse to poll `self.in_flight`
self.poll_next(cx)
}
}
#[test]
fn polling_is_send() {
use crate::dispatching::update_listeners::AsUpdateStream;
let bot = crate::Bot::new("TOKEN");
#[allow(deprecated)]
let mut polling = polling(bot, None, None, None);
assert_send(&polling);

View file

@ -4,6 +4,7 @@ use std::net::SocketAddr;
use crate::{requests::Requester, types::InputFile};
/// Options related to setting up webhooks.
#[must_use]
pub struct Options {
/// Local address to listen to.
pub address: SocketAddr,
@ -42,13 +43,28 @@ pub struct Options {
///
/// Default - false.
pub drop_pending_updates: bool,
/// A secret token to be sent in a header “X-Telegram-Bot-Api-Secret-Token”
/// in every webhook request, 1-256 characters. Only characters `A-Z`,
/// `a-z`, `0-9`, `_` and `-` are allowed. The header is useful to ensure
/// that the request comes from a webhook set by you.
///
/// Default - teloxide will generate a random token.
pub secret_token: Option<String>,
}
impl Options {
/// Construct a new webhook options, see [`Options::address`] and
/// [`Options::url`] for details.
pub fn new(address: SocketAddr, url: url::Url) -> Self {
Self { address, url, certificate: None, max_connections: None, drop_pending_updates: false }
Self {
address,
url,
certificate: None,
max_connections: None,
drop_pending_updates: false,
secret_token: None,
}
}
/// Upload your public key certificate so that the root certificate in use
@ -71,6 +87,32 @@ impl Options {
pub fn drop_pending_updates(self) -> Self {
Self { drop_pending_updates: true, ..self }
}
/// A secret token to be sent in a header “X-Telegram-Bot-Api-Secret-Token”
/// in every webhook request, 1-256 characters. Only characters `A-Z`,
/// `a-z`, `0-9`, `_` and `-` are allowed. The header is useful to ensure
/// that the request comes from a webhook set by you.
///
/// ## Panics
///
/// If the token is invalid.
#[track_caller]
pub fn secret_token(self, token: String) -> Self {
check_secret(token.as_bytes()).expect("Invalid secret token");
Self { secret_token: Some(token), ..self }
}
/// Returns `self.secret_token`, generating a new one if it's `None`.
///
/// After a call to this function `self.secret_token` is always `Some(_)`.
///
/// **Note**: if you leave webhook setup to teloxide, it will automatically
/// generate a secret token. Call this function only if you need to know the
/// secret (for example because you are calling `set_webhook` by yourself).
pub fn get_or_gen_secret_token(&mut self) -> &str {
self.secret_token.get_or_insert_with(gen_secret_token)
}
}
#[cfg(feature = "webhooks-axum")]
@ -91,6 +133,7 @@ where
use crate::requests::Request;
use teloxide_core::requests::HasPayload;
let secret = options.get_or_gen_secret_token().to_owned();
let &mut Options {
ref url, ref mut certificate, max_connections, drop_pending_updates, ..
} = options;
@ -99,12 +142,47 @@ where
req.payload_mut().certificate = certificate.take();
req.payload_mut().max_connections = max_connections;
req.payload_mut().drop_pending_updates = Some(drop_pending_updates);
req.payload_mut().secret_token = Some(secret);
req.send().await?;
Ok(())
}
/// Generates a random string consisting of 32 characters (`a-z`, `A-Z`, `0-9`,
/// `_` and `-`).
fn gen_secret_token() -> String {
use rand::{distributions::Uniform, Rng};
const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-";
const SECRET_LENGTH: usize = 32;
let random = rand::thread_rng()
.sample_iter(Uniform::new(0, CHARSET.len()))
.map(|idx| CHARSET[idx] as char)
.take(SECRET_LENGTH);
let mut secret = String::with_capacity(SECRET_LENGTH);
secret.extend(random);
secret
}
fn check_secret(bytes: &[u8]) -> Result<&[u8], &'static str> {
let len = bytes.len();
if !(1..=256).contains(&len) {
return Err("secret token length must be in range 1..=256");
}
let is_not_supported =
|c: &_| !matches!(c, b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b'_' | b'-');
if bytes.iter().any(is_not_supported) {
return Err("secret token must only contain of `a-z`, `A-Z`, `0-9`, `_` and `-` characters");
}
Ok(bytes)
}
/// Returns first (`.0`) field from a tuple as a `&mut` reference.
///
/// This hack is needed because there isn't currently a way to easily force a

View file

@ -1,12 +1,14 @@
use std::convert::Infallible;
use std::{convert::Infallible, future::Future, pin::Pin};
use axum::{
extract::{FromRequest, RequestParts},
http::status::StatusCode,
};
use crate::{
dispatching::{
stop_token::{AsyncStopFlag, StopToken},
update_listeners::{
webhooks::{setup_webhook, tuple_first_mut, Options},
UpdateListener,
},
update_listeners::{webhooks::Options, UpdateListener},
},
requests::Requester,
};
@ -105,15 +107,12 @@ where
pub async fn axum_to_router<R>(
bot: R,
mut options: Options,
) -> Result<
(impl UpdateListener<Infallible>, impl std::future::Future<Output = ()> + Send, axum::Router),
R::Err,
>
) -> Result<(impl UpdateListener<Infallible>, impl Future<Output = ()> + Send, axum::Router), R::Err>
where
R: Requester + Send,
<R as Requester>::DeleteWebhook: Send,
{
use crate::requests::Request;
use crate::{dispatching::update_listeners::webhooks::setup_webhook, requests::Request};
use futures::FutureExt;
setup_webhook(&bot, &mut options).await?;
@ -149,12 +148,15 @@ where
/// function.
pub fn axum_no_setup(
options: Options,
) -> (impl UpdateListener<Infallible>, impl std::future::Future<Output = ()>, axum::Router) {
) -> (impl UpdateListener<Infallible>, impl Future<Output = ()>, axum::Router) {
use crate::{
dispatching::{stop_token::AsyncStopToken, update_listeners},
dispatching::{
stop_token::AsyncStopToken,
update_listeners::{self, webhooks::tuple_first_mut},
},
types::Update,
};
use axum::{extract::Extension, http::StatusCode, response::IntoResponse, routing::post};
use axum::{extract::Extension, response::IntoResponse, routing::post};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tower::ServiceBuilder;
@ -167,9 +169,16 @@ pub fn axum_no_setup(
async fn telegram_request(
input: String,
secret_header: XTelegramBotApiSecretToken,
secret: Extension<Option<String>>,
tx: Extension<CSender>,
flag: Extension<AsyncStopFlag>,
) -> impl IntoResponse {
// FIXME: use constant time comparison here
if secret_header.0.as_deref() != secret.as_deref().map(str::as_bytes) {
return StatusCode::UNAUTHORIZED;
}
let tx = match tx.get() {
None => return StatusCode::SERVICE_UNAVAILABLE,
// Do not process updates after `.stop()` is called even if the server is still
@ -206,6 +215,7 @@ pub fn axum_no_setup(
.layer(TraceLayer::new_for_http())
.layer(Extension(ClosableSender::new(tx)))
.layer(Extension(stop_flag.clone()))
.layer(Extension(options.secret_token))
.into_inner(),
);
@ -245,3 +255,32 @@ impl<T> ClosableSender<T> {
self.origin.write().unwrap().take();
}
}
struct XTelegramBotApiSecretToken(Option<Vec<u8>>);
impl<B> FromRequest<B> for XTelegramBotApiSecretToken {
type Rejection = StatusCode;
fn from_request<'l0, 'at>(
req: &'l0 mut RequestParts<B>,
) -> Pin<Box<dyn Future<Output = Result<Self, Self::Rejection>> + Send + 'at>>
where
'l0: 'at,
Self: 'at,
{
use crate::dispatching::update_listeners::webhooks::check_secret;
let res = req
.headers_mut()
.remove("x-telegram-bot-api-secret-token")
.map(|header| {
check_secret(header.as_bytes())
.map(<_>::to_owned)
.map_err(|_| StatusCode::BAD_REQUEST)
})
.transpose()
.map(Self);
Box::pin(async { res }) as _
}
}

34
src/features.md Normal file
View file

@ -0,0 +1,34 @@
## Cargo features
| Feature | Description |
|----------------------|--------------------------------------------------------------------------------------------|
| `webhooks` | Enables general webhook utilities (almost useless on its own) |
| `webhooks-axum` | Enables webhook implementation based on axum framework |
| `macros` | Re-exports macros from [`teloxide-macros`]. |
| `ctrlc_handler` | Enables the [`DispatcherBuilder::enable_ctrlc_handler`] function (**enabled by default**). |
| `auto-send` | Enables the [`AutoSend`](adaptors::AutoSend) bot adaptor (**enabled by default**). |
| `throttle` | Enables the [`Throttle`](adaptors::Throttle) bot adaptor. |
| `cache-me` | Enables the [`CacheMe`](adaptors::CacheMe) bot adaptor. |
| `trace-adaptor` | Enables the [`Trace`](adaptors::Trace) bot adaptor. |
| `erased` | Enables the [`ErasedRequester`](adaptors::ErasedRequester) bot adaptor. |
| `full` | Enables all the features except `nightly`. |
| `nightly` | Enables nightly-only features (see the [teloxide-core features]). |
| `native-tls` | Enables the [`native-tls`] TLS implementation (**enabled by default**). |
| `rustls` | Enables the [`rustls`] TLS implementation. |
| `redis-storage` | Enables the [Redis] storage support for dialogues. |
| `sqlite-storage` | Enables the [Sqlite] storage support for dialogues. |
| `cbor-serializer` | Enables the [CBOR] serializer for dialogues. |
| `bincode-serializer` | Enables the [Bincode] serializer for dialogues. |
[Redis]: https://redis.io/
[Sqlite]: https://www.sqlite.org/
[CBOR]: https://en.wikipedia.org/wiki/CBOR
[Bincode]: https://github.com/servo/bincode
[`teloxide-macros`]: https://github.com/teloxide/teloxide-macros
[`native-tls`]: https://docs.rs/native-tls
[`rustls`]: https://docs.rs/rustls
[`teloxide::utils::UpState`]: utils::UpState
[teloxide-core features]: https://docs.rs/teloxide-core/latest/teloxide_core/#cargo-features
[`DispatcherBuilder::enable_ctrlc_handler`]: dispatching::DispatcherBuilder::enable_ctrlc_handler

View file

@ -1,29 +0,0 @@
## Cargo features
| Feature | Description |
|----------|----------|
| `redis-storage` | Enables the [Redis] storage support for dialogues.|
| `sqlite-storage` | Enables the [Sqlite] storage support for dialogues. |
| `cbor-serializer` | Enables the [CBOR] serializer for dialogues. |
| `bincode-serializer` | Enables the [Bincode] serializer for dialogues. |
| `macros` | Re-exports macros from [`teloxide-macros`]. |
| `native-tls` | Enables the [`native-tls`] TLS implementation (enabled by default). |
| `rustls` | Enables the [`rustls`] TLS implementation. |
| `ctrlc_handler` | Enables the [`Dispatcher::setup_ctrlc_handler`](dispatching::Dispatcher::setup_ctrlc_handler) function. |
| `auto-send` | Enables the [`AutoSend`](adaptors::AutoSend) bot adaptor. |
| `throttle` | Enables the [`Throttle`](adaptors::Throttle) bot adaptor. |
| `cache-me` | Enables the [`CacheMe`](adaptors::CacheMe) bot adaptor. |
| `trace-adaptor` | Enables the [`Trace`](adaptors::Trace) bot adaptor. |
| `erased` | Enables the [`ErasedRequester`](adaptors::ErasedRequester) bot adaptor. |
| `full` | Enables all the features except `nightly`. |
| `nightly` | Enables nightly-only features (see the [teloxide-core features]). |
[Redis]: https://redis.io/
[Sqlite]: https://www.sqlite.org/
[CBOR]: https://en.wikipedia.org/wiki/CBOR
[Bincode]: https://github.com/servo/bincode
[`teloxide-macros`]: https://github.com/teloxide/teloxide-macros
[`native-tls`]: https://docs.rs/native-tls
[`rustls`]: https://docs.rs/rustls
[`teloxide::utils::UpState`]: utils::UpState
[teloxide-core features]: https://docs.rs/teloxide-core/latest/teloxide_core/#cargo-features

View file

@ -38,21 +38,17 @@
// [1]: https://github.com/rust-lang/rustfmt/issues/4210
// [2]: https://github.com/rust-lang/rustfmt/issues/4787
// [3]: https://github.com/rust-lang/rust/issues/82768#issuecomment-803935643
#![cfg_attr(feature = "nightly", cfg_attr(feature = "nightly", doc = include_str!("features.txt")))]
#![cfg_attr(feature = "nightly", cfg_attr(feature = "nightly", doc = include_str!("features.md")))]
// https://github.com/teloxide/teloxide/raw/master/logo.svg doesn't work in html_logo_url, I don't know why.
#![doc(
html_logo_url = "https://github.com/teloxide/teloxide/raw/master/ICON.png",
html_favicon_url = "https://github.com/teloxide/teloxide/raw/master/ICON.png"
)]
// We pass "--cfg docsrs" when building docs to add `This is supported on
// feature="..." only.`
//
// "--cfg dep_docsrs" is used for the same reason, but for `teloxide-core`.
//
// To properly build docs of this crate run
// ```console
// $ RUSTFLAGS="--cfg dep_docsrs" RUSTDOCFLAGS="--cfg docsrs -Znormalize-docs" cargo +nightly doc --open --all-features
// $ cargo docs --open
// ```
// (docs is an alias from `.cargo/config.toml`)
#![cfg_attr(all(docsrs, feature = "nightly"), feature(doc_cfg, doc_auto_cfg))]
#![forbid(unsafe_code)]
#![warn(rustdoc::broken_intra_doc_links)]

View file

@ -191,6 +191,8 @@ mod tests {
last_name: None,
username: Some("abcd".to_string()),
language_code: None,
is_premium: false,
added_to_attachment_menu: false,
};
assert_eq!(user_mention_or_link(&user_with_username), "@abcd");
let user_without_username = User {
@ -200,6 +202,8 @@ mod tests {
last_name: None,
username: None,
language_code: None,
is_premium: false,
added_to_attachment_menu: false,
};
assert_eq!(
user_mention_or_link(&user_without_username),

View file

@ -240,6 +240,8 @@ mod tests {
last_name: None,
username: Some("abcd".to_string()),
language_code: None,
is_premium: false,
added_to_attachment_menu: false,
};
assert_eq!(user_mention_or_link(&user_with_username), "@abcd");
let user_without_username = User {
@ -249,6 +251,8 @@ mod tests {
last_name: None,
username: None,
language_code: None,
is_premium: false,
added_to_attachment_menu: false,
};
assert_eq!(
user_mention_or_link(&user_without_username),

View file

@ -2,7 +2,7 @@
#![allow(clippy::nonstandard_macro_braces)]
#[cfg(feature = "macros")]
use teloxide::utils::command::{BotCommands, ParseError};
use teloxide::utils::command::BotCommands;
// We put tests here because macro expand in unit tests in module
// teloxide::utils::command was a failure
@ -141,22 +141,33 @@ fn parse_with_split2() {
#[test]
#[cfg(feature = "macros")]
fn parse_custom_parser() {
fn custom_parse_function(s: String) -> Result<(u8, String), ParseError> {
let vec = s.split_whitespace().collect::<Vec<_>>();
let (left, right) = match vec.as_slice() {
[l, r] => (l, r),
_ => return Err(ParseError::IncorrectFormat("might be 2 arguments!".into())),
};
left.parse::<u8>()
.map(|res| (res, (*right).to_string()))
.map_err(|_| ParseError::Custom("First argument must be a integer!".to_owned().into()))
mod parser {
use teloxide::utils::command::ParseError;
pub fn custom_parse_function(s: String) -> Result<(u8, String), ParseError> {
let vec = s.split_whitespace().collect::<Vec<_>>();
let (left, right) = match vec.as_slice() {
[l, r] => (l, r),
_ => return Err(ParseError::IncorrectFormat("might be 2 arguments!".into())),
};
left.parse::<u8>().map(|res| (res, (*right).to_string())).map_err(|_| {
ParseError::Custom("First argument must be a integer!".to_owned().into())
})
}
}
use parser::custom_parse_function;
#[derive(BotCommands, Debug, PartialEq)]
#[command(rename = "lowercase")]
enum DefaultCommands {
#[command(parse_with = "custom_parse_function")]
Start(u8, String),
// Test <https://github.com/teloxide/teloxide/issues/668>.
#[command(parse_with = "parser::custom_parse_function")]
TestPath(u8, String),
Help,
}
@ -164,6 +175,10 @@ fn parse_custom_parser() {
DefaultCommands::Start(10, "hello".to_string()),
DefaultCommands::parse("/start 10 hello", "").unwrap()
);
assert_eq!(
DefaultCommands::TestPath(10, "hello".to_string()),
DefaultCommands::parse("/testpath 10 hello", "").unwrap()
);
}
#[test]