Merge pull request #412 from teloxide/dev

Sponge Bob
This commit is contained in:
Hirrolot 2021-07-08 03:02:02 -07:00 committed by GitHub
commit 5de1a54221
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
47 changed files with 1693 additions and 569 deletions

4
.github/ISSUE_TEMPLATE/config.yml vendored Normal file
View file

@ -0,0 +1,4 @@
contact_links:
- name: Teloxide Discussions
url: https://github.com/teloxide/teloxide/discussions/categories/q-a
about: Please ask and answer questions here.

View file

@ -2,7 +2,7 @@
name: Parse error
about: Report issue with `teloxide` parsing of telegram response
title: 'Parse Error: <type or error description>'
labels: FIXME, bug
labels: bug, FIXME, core
assignees: WaffleLapkin
---

View file

@ -2,7 +2,7 @@
name: Unknown telegram error
about: You've found telegram error which is not known to teloxide
title: 'Unknown Error: <error description>'
labels: FIXME, bug, good first issue
labels: bug, good first issue, FIXME, core, Unknown API error
assignees: ''
---

View file

@ -6,7 +6,53 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [unreleased]
## [0.4.0] - 2021-03-19
## [0.5.0] - 2021-07-08
### Added
- `Storage::get_dialogue` to obtain a dialogue indexed by a chat ID.
- `InMemStorageError` with a single variant `DialogueNotFound` to be returned from `InMemStorage::remove_dialogue`.
- `RedisStorageError::DialogueNotFound` and `SqliteStorageError::DialogueNotFound` to be returned from `Storage::remove_dialogue`.
- A way to `shutdown` dispatcher
- `Dispatcher::shutdown_token` function.
- `ShutdownToken` with a `shutdown` function.
- `Dispatcher::setup_ctrlc_handler` function ([issue 153](https://github.com/teloxide/teloxide/issues/153)).
- `IdleShutdownError`
- Automatic update filtering ([issue 389](https://github.com/teloxide/teloxide/issues/389)).
- Added reply shortcut to every kind of messages ([PR 404](https://github.com/teloxide/teloxide/pull/404)).
### Changed
- Do not return a dialogue from `Storage::{remove_dialogue, update_dialogue}`.
- Return an error from `Storage::remove_dialogue` if a dialogue does not exist.
- Require `D: Clone` in `dialogues_repl(_with_listener)` and `InMemStorage`.
- Automatically delete a webhook if it was set up in `update_listeners::polling_default` (thereby making it `async`, [issue 319](https://github.com/teloxide/teloxide/issues/319)).
- `polling` and `polling_default` now require `R: 'static`
- Refactor `UpdateListener` trait:
- Add a `StopToken` associated type.
- It must implement a new `StopToken` trait which has the only function `fn stop(self);`
- Add a `stop_token` function that returns `Self::StopToken` and allows stopping the listener later ([issue 166](https://github.com/teloxide/teloxide/issues/166)).
- Remove blanked implementation.
- Remove `Stream` from super traits.
- Add `AsUpdateStream` to super traits.
- Add an `AsUpdateStream` trait that allows turning implementors into streams of updates (GAT workaround).
- Add a `timeout_hint` function (with a default implementation).
- `Dispatcher::dispatch` and `Dispatcher::dispatch_with_listener` now require mutable reference to self.
- Repls can now be stopped by `^C` signal.
- `Noop` and `AsyncStopToken`stop tokens.
- `StatefulListener`.
- Emit not only errors but also warnings and general information from teloxide, when set up by `enable_logging!`.
- Use `i64` instead of `i32` for `user_id` in `html::user_mention` and `markdown::user_mention`.
- Updated to `teloxide-core` `v0.3.0` (see it's [changelog](https://github.com/teloxide/teloxide-core/blob/master/CHANGELOG.md#030---2021-07-05) for more)
### Fixed
- Remove the `reqwest` dependency. It's not needed after the [teloxide-core] integration.
- A storage persistency bug ([issue 304](https://github.com/teloxide/teloxide/issues/304)).
- Log errors from `Storage::{remove_dialogue, update_dialogue}` in `DialogueDispatcher` ([issue 302](https://github.com/teloxide/teloxide/issues/302)).
- Mark all the functions of `Storage` as `#[must_use]`.
## [0.4.0] - 2021-03-22
### Added
- Integrate [teloxide-core].

View file

@ -124,3 +124,4 @@ C: Into<String>, { ... }
1. Use `Into<...>` only where there exists at least one conversion **and** it will be logically to use.
2. Always mark a function as `#[must_use]` if its return value **must** be used.
3. `Box::pin(async [move] { ... })` instead of `async [move] { ... }.boxed()`.
4. Always write `log::<op>!(...)` instead of importing `use log::<op>;` and invoking `<op>!(...)`. For example, write `log::info!("blah")`.

View file

@ -1,16 +1,17 @@
[package]
name = "teloxide"
version = "0.4.0"
version = "0.5.0"
edition = "2018"
description = "An elegant Telegram bots framework for Rust"
repository = "https://github.com/teloxide/teloxide"
documentation = "https://docs.rs/teloxide/"
readme = "README.md"
keywords = ["teloxide", "telegram", "telegram-bot", "telegram-bot-api"]
categories = ["web-programming", "api-bindings", "asynchronous"]
license = "MIT"
exclude = ["media"]
authors = [
"Temirkhan Myrzamadi <hirrolot@gmail.com>",
"Hirrolot <hirrolot@gmail.com>",
"Waffle Lapkin <waffle.lapkin@gmail.com>",
"p0lunin <dmytro.polunin@gmail.com>",
"Mishko torop'izhko",
@ -24,6 +25,8 @@ authors = [
maintenance = { status = "actively-developed" }
[features]
default = ["native-tls", "ctrlc_handler", "teloxide-core/default"]
sqlite-storage = ["sqlx"]
redis-storage = ["redis"]
cbor-serializer = ["serde_cbor"]
@ -32,6 +35,8 @@ bincode-serializer = ["bincode"]
frunk- = ["frunk"]
macros = ["teloxide-macros"]
ctrlc_handler = ["tokio/signal"]
native-tls = ["teloxide-core/native-tls"]
rustls = ["teloxide-core/rustls"]
auto-send = ["teloxide-core/auto_send"]
@ -49,6 +54,7 @@ full = [
"bincode-serializer",
"frunk",
"macros",
"ctrlc_handler",
"teloxide-core/full",
"native-tls",
"rustls",
@ -58,19 +64,19 @@ full = [
]
[dependencies]
teloxide-core = { version = "0.2.1", default-features = false }
teloxide-core = { version = "0.3.1", default-features = false }
#teloxide-core = { git = "https://github.com/teloxide/teloxide-core.git", rev = "...", default-features = false }
teloxide-macros = { version = "0.4", optional = true }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.2", features = ["fs"] }
tokio = { version = "1.8", features = ["fs"] }
tokio-util = "0.6"
tokio-stream = "0.1"
reqwest = { version = "0.11", features = ["json", "stream"] }
flurry = "0.3"
log = "0.4"
lockfree = "0.5.1"
bytes = "1.0"
mime = "0.3"
@ -89,18 +95,19 @@ sqlx = { version = "0.5", optional = true, default-features = false, features =
redis = { version = "0.20", features = ["tokio-comp"], optional = true }
serde_cbor = { version = "0.11", optional = true }
bincode = { version = "1.3", optional = true }
frunk = { version = "0.3", optional = true }
frunk = { version = "0.4", optional = true }
[dev-dependencies]
smart-default = "0.6.0"
rand = "0.8.3"
pretty_env_logger = "0.4.0"
lazy_static = "1.4.0"
tokio = { version = "1.2.0", features = ["fs", "rt-multi-thread", "macros"] }
tokio = { version = "1.8", features = ["fs", "rt-multi-thread", "macros"] }
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
rustdoc-args = ["--cfg", "docsrs", "-Znormalize-docs"]
rustc-args = ["--cfg", "dep_docsrs"]
[[test]]
name = "redis"

162
MIGRATION_GUIDE.md Normal file
View file

@ -0,0 +1,162 @@
This document describes breaking changes of `teloxide` crate, as well as the ways to update code.
Note that the list of required changes is not fully exhaustive and it may lack something in rare cases.
## 0.4 -> 0.5
### core
#### Field type changes
Types of some fields were changed to be more accurate.
If you used them, you may need to change types in your code too.
Example:
```diff
let ps: PhotoSize = /* ... */;
-let w: i32 = ps.width;
+let w: u32 = ps.width;
```
List of changed types:
- `PhotoSoze::width`: `i32` -> `u32`
- `PhotoSoze::height`: `i32` -> `u32`
- `Restricted::until_date`: `i32` -> `DateTime<Utc>`
- `Kicked::until_date` (`Banned::until_date`): `i32` -> `DateTime<Utc>`
- `PublicChatSupergroup::slow_mode_delay`: `Option<i32>` -> `Option<u32>`
- `User::id`: `i32` -> `i64` (note: all methods which are accepting `user_id` were changed too)
#### Method output types
In teloxide `v0.4` (core `v0.2`) some API methods had wrong return types.
This made them practically unusable as they've always returned parsing error.
On the offchance you were using the methods, you may need to adjust types in your code.
List of changed return types:
- `get_chat_administrators`: `ChatMember` -> `Vec<ChatMember>`
- `send_chat_action`: `Message` -> `True`
- `leave_chat`: `String` -> `True`
- `pin_chat_message`: `String` -> `True`
- `set_chat_description`: `String` -> `True`
- `set_chat_photo`: `String` -> `True`
- `set_chat_title`: `String` -> `True`
- `unpin_all_chat_messages`: `String` -> `True`
- `unpin_chat_message`: `String` -> `True`
#### Method parameter types
Some API methods accept different types now.
If you've used changed parameters, you need to adjust code for new types.
Examples:
```diff
let bot = Bot::new("TOKEN").auto_send();
-bot.set_webhook("url").await?;
+bot.set_webhook(Url::parse("url").unwrap()).await?;
let link = bot
.create_chat_invite_link(chat_id)
- .expire_date(timestamp)
# Note: this is not the only way to create `DateTime`. Refer to `chrono` docs for more.
+ .expire_date(DateTime::<Utc>::from_utc(
+ NaiveDateTime::from_timestamp(timestamp, 0), Utc)
+ )
.await?;
```
See also: [teloxide examples fixes](https://github.com/teloxide/teloxide/pull/408/files/369e43aa7ed1b192d326e6bdfe76f3560001353f..18f88cc034e97fd437c48930728c1d5d2da7a14d).
List of changed required params:
- `SetWebhook::url`: `String` -> `Url`
List of changed optional params:
- `AnswerCallbackQuery::url`: `String` -> `Url`
- `SendInvoice::photo_url`: `String` -> `Url`
- `CreateChatInviteLink::expire_date`: `i64` -> `DateTime<Utc>`
- `EditChatInviteLink::expire_date`: `i64` -> `DateTime<Utc>`
- `KickChatMember::until_date`: `u64` -> `DateTime<Utc>`
- `RestrictChatMember::until_date`: `u64` -> `DateTime<Utc>`
- `SendPoll::close_date`: `u64` -> `DateTime<Utc>`
#### Renamed items
Some items (fields, variants, types, methods) were renamed.
If you used them, you should start using new names.
Example:
```diff
-bot.send_chat_action(chat, ChatAction::RecordAudio).await?;
+bot.send_chat_action(chat, ChatAction::RecordVoice).await?;
-if chat_member.is_kicked() {
+if chat_member.is_banned() {
/* ... */
}
```
List of renamed items:
- `ChatAction::RecordAudio` -> `RecordVoice`
- `ChatAction::UploadAudio` -> `UploadVoice`
- `ChatMemberKind::Creator` -> `Owner`
- `ChatMemberKind::Kicked` -> `Banned`
- `Creator` -> `Owner`
- `Kicked` -> `Banned`
- `ChatMemberKind::is_Creator` -> `is_owner` *
- `ChatMemberKind::is_kicked` -> `is_banned` *
- `ChatMemberStatus::Creator` -> `Owner`
- `ChatMemberStatus::Kicked` -> `Banned`
- `kick_chat_member` -> `ban_chat_member` *
- `get_chat_members_count` -> `get_chat_member_count` *
\* Old methods are still accessible, but deprecated
#### Added `impl Clone` for {`CacheMe`, `DefaultParseMode`, `Throttle`}
Previously said bot adaptors were lacking `Clone` implementation.
To workaround this issue it was proposed to wrap bot in `Arc`.
Now it's not required, so you can remove the `Arc`:
```diff
let bot = Bot::new(token).parse_mode(ParseMode::MarkdownV2);
-let bot = Arc::new(bot);
```
### teloxide
#### Mutable reference for dispatching
`Dispatcher::dispatch` and `Dispatcher::dispatch_with_listener` now require mutable (unique) reference to self.
If you've used variable to store `Dispatcher`, you need to make it mutable:
```diff
-let dp = Dispatcher::new();
+let mut dp = Dispatcher::new();
/* ... */
dp.dispatch();
```
#### Listener refactor
`UpdateListener` trait was refactored.
If you've used `polling`/`polling_default` provided by teloxide, no changes are required.
If, however, you've used or implemented `UpdateListener` directly or used a `Stream` as a listener,
then you need to refactor your code too.
See also: [teloxide examples fixes](https://github.com/teloxide/teloxide/pull/385/files/8785b8263cb4caebf212e2a66a19f73e653eb060..c378d6ef4e524da96718beec6f989e8ac51d1531).
#### `polling_default`
`polling_default` is now async, but removes webhook.
Example fix:
```diff
-let listener = polling_default(bot);
+let listener = polling_default(bot).await;
```

View file

@ -1,3 +1,5 @@
[_v0.4.0 => v0.5.0 migration guide >>_](MIGRATION_GUIDE.md#04---05)
<div align="center">
<img src="ICON.png" width="250"/>
<h1>teloxide</h1>
@ -14,7 +16,7 @@
<img src="https://img.shields.io/crates/v/teloxide.svg">
</a>
<a href="https://core.telegram.org/bots/api">
<img src="https://img.shields.io/badge/API coverage-Up to 5.1 (inclusively)-green.svg">
<img src="https://img.shields.io/badge/API coverage-Up to 5.3 (inclusively)-green.svg">
</a>
<a href="https://t.me/teloxide">
<img src="https://img.shields.io/badge/official%20chat-t.me%2Fteloxide-blueviolet">
@ -23,19 +25,6 @@
A full-featured framework that empowers you to easily build [Telegram bots](https://telegram.org/blog/bot-revolution) using the [`async`/`.await`](https://rust-lang.github.io/async-book/01_getting_started/01_chapter.html) syntax in [Rust](https://www.rust-lang.org/). It handles all the difficult stuff so you can focus only on your business logic.
</div>
## Table of contents
- [Highlights](#highlights)
- [Setting up your environment](#setting-up-your-environment)
- [API overview](#api-overview)
- [The dices bot](#the-dices-bot)
- [Commands](#commands)
- [Dialogues management](#dialogues-management)
- [Recommendations](#recommendations)
- [Cargo features](#cargo-features)
- [FAQ](#faq)
- [Community bots](#community-bots)
- [Contributing](#contributing)
## Highlights
- **Functional reactive design.** teloxide follows [functional reactive design], allowing you to declaratively manipulate streams of updates from Telegram using filters, maps, folds, zips, and a lot of [other adaptors].
@ -79,10 +68,10 @@ $ rustup override set nightly
5. Run `cargo new my_bot`, enter the directory and put these lines into your `Cargo.toml`:
```toml
[dependencies]
teloxide = "0.4"
teloxide = { version = "0.4", features = ["auto-send", "macros"] }
log = "0.4.8"
pretty_env_logger = "0.4.0"
tokio = { version = "1.3", features = ["rt-threaded", "macros"] }
tokio = { version = "1.3", features = ["rt-multi-thread", "macros"] }
```
## API overview
@ -147,7 +136,7 @@ async fn answer(
command: Command,
) -> Result<(), Box<dyn Error + Send + Sync>> {
match command {
Command::Help => cx.answer(Command::descriptions()).send().await?,
Command::Help => cx.answer(Command::descriptions()).await?,
Command::Username(username) => {
cx.answer(format!("Your username is @{}.", username)).await?
}
@ -178,7 +167,7 @@ async fn main() {
</div>
### Dialogues management
A dialogue is described by an enumeration where each variant is one of possible dialogue's states. There are also _subtransition functions_, which turn a dialogue from one state to another, thereby forming a [FSM].
A dialogue is described by an enumeration where each variant is one of possible dialogue's states. There are also _subtransition functions_, which turn a dialogue from one state to another, thereby forming an [FSM].
[FSM]: https://en.wikipedia.org/wiki/Finite-state_machine
@ -378,29 +367,6 @@ async fn handle_message(
The second one produces very strange compiler messages due to the `#[tokio::main]` macro. However, the examples in this README use the second variant for brevity.
## Cargo features
- `redis-storage` -- enables the [Redis] support.
- `sqlite-storage` -- enables the [Sqlite] support.
- `cbor-serializer` -- enables the [CBOR] serializer for dialogues.
- `bincode-serializer` -- enables the [Bincode] serializer for dialogues.
- `frunk` -- enables [`teloxide::utils::UpState`], which allows mapping from a structure of `field1, ..., fieldN` to a structure of `field1, ..., fieldN, fieldN+1`.
- `macros` -- re-exports macros from [`teloxide-macros`].
- `native-tls` -- enables the [`native-tls`] TLS implementation (enabled by default).
- `rustls` -- enables the [`rustls`] TLS implementation.
- `auto-send` -- enables `AutoSend` bot adaptor.
- `cache-me` -- enables the `CacheMe` bot adaptor.
- `full` -- enables all the features except `nightly`.
- `nightly` -- enables nightly-only features (see the [teloxide-core's features]).
[CBOR]: https://en.wikipedia.org/wiki/CBOR
[Bincode]: https://github.com/servo/bincode
[`teloxide::utils::UpState`]: https://docs.rs/teloxide/latest/teloxide/utils/trait.UpState.html
[`teloxide-macros`]: https://github.com/teloxide/teloxide-macros
[`native-tls`]: https://docs.rs/native-tls
[`rustls`]: https://docs.rs/rustls
[teloxide-core's features]: https://docs.rs/teloxide-core/0.2.1/teloxide_core/#cargo-features
## FAQ
**Q: Where I can ask questions?**
@ -443,15 +409,21 @@ A: Yes. You can setup any logger, for example, [fern], e.g. teloxide has no spec
[`enable_logging_with_filter!`]: https://docs.rs/teloxide/latest/teloxide/macro.enable_logging_with_filter.html
## Community bots
Feel free to push your own bot into our collection!
Feel free to propose your own bot to our collection!
- [_steadylearner/subreddit_reader_](https://github.com/steadylearner/Rust-Full-Stack/tree/master/commits/teloxide/subreddit_reader)
- [_ArtHome12/vzmuinebot -- Telegram bot for food menu navigate_](https://github.com/ArtHome12/vzmuinebot)
- [_Hermitter/tepe -- A CLI to command a bot to send messages and files over Telegram_](https://github.com/Hermitter/tepe)
- [_ArtHome12/cognito_bot -- The bot is designed to anonymize messages to a group_](https://github.com/ArtHome12/cognito_bot)
- [_GoldsteinE/tg-vimhelpbot -- Link `:help` for Vim in Telegram_](https://github.com/GoldsteinE/tg-vimhelpbot)
- [_sschiz/janitor-bot_ -- A bot that removes users trying to join to a chat that is designed for comments](https://github.com/sschiz/janitor-bot)
- [ myblackbeard/basketball-betting-bot -- The bot lets you bet on NBA games against your buddies](https://github.com/myblackbeard/basketball-betting-bot)
- [dracarys18/grpmr-rs](https://github.com/dracarys18/grpmr-rs) -- A telegram group manager bot with variety of extra features.
- [steadylearner/subreddit_reader](https://github.com/steadylearner/Rust-Full-Stack/tree/master/commits/teloxide/subreddit_reader) -- A bot that shows the latest posts at Rust subreddit.
- [ArtHome12/vzmuinebot](https://github.com/ArtHome12/vzmuinebot) -- Telegram bot for food menu navigate.
- [ArtHome12/cognito_bot](https://github.com/ArtHome12/cognito_bot) -- The bot is designed to anonymize messages to a group.
- [Hermitter/tepe](https://github.com/Hermitter/tepe) -- A CLI to command a bot to send messages and files over Telegram.
- [pro-vim/tg-vimhelpbot](https://github.com/pro-vim/tg-vimhelpbot) -- Link `:help` for Vim in Telegram.
- [sschiz/janitor-bot](https://github.com/sschiz/janitor-bot) -- A bot that removes users trying to join to a chat that is designed for comments.
- [myblackbeard/basketball-betting-bot](https://github.com/myblackbeard/basketball-betting-bot) -- The bot lets you bet on NBA games against your buddies.
- [slondr/BeerHolderBot](https://gitlab.com/slondr/BeerHolderBot) -- A bot that holds your beer.
- [mxseev/logram](https://github.com/mxseev/logram) -- Utility that takes logs from anywhere and sends them to Telegram.
- [msfjarvis/walls-bot-rs](https://github.com/msfjarvis/walls-bot-rs) -- Telegram bot for my wallpapers collection, in Rust.
- [MustafaSalih1993/Miss-Vodka-Telegram-Bot](https://github.com/MustafaSalih1993/Miss-Vodka-Telegram-Bot) -- A telegram bot written in rust using "Teloxide" library.
- [x13a/tg-prompt](https://github.com/x13a/tg-prompt) -- Telegram prompt.
## Contributing
See [CONRIBUTING.md](https://github.com/teloxide/teloxide/blob/master/CONTRIBUTING.md).

View file

@ -11,6 +11,7 @@ teloxide = { path = "../../", features = ["macros", "auto-send"] }
log = "0.4.8"
pretty_env_logger = "0.4.0"
tokio = { version = "1.3.0", features = ["rt-multi-thread", "macros"] }
chrono = "0.4"
[profile.release]
lto = true

View file

@ -1,8 +1,7 @@
use std::{convert::TryInto, error::Error, str::FromStr};
use std::{error::Error, str::FromStr};
use teloxide::{prelude::*, utils::command::BotCommand};
use teloxide::types::ChatPermissions;
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use teloxide::{prelude::*, types::{ChatPermissions, Me}, utils::command::BotCommand};
// Derive BotCommand to parse text with a command into this enumeration.
//
@ -24,12 +23,12 @@ enum Command {
Kick,
#[command(description = "ban user in chat.")]
Ban {
time: u32,
time: u64,
unit: UnitOfTime,
},
#[command(description = "mute user in chat.")]
Mute {
time: u32,
time: u64,
unit: UnitOfTime,
},
Help,
@ -54,18 +53,18 @@ impl FromStr for UnitOfTime {
}
// Calculates time of user restriction.
fn calc_restrict_time(time: u32, unit: UnitOfTime) -> u32 {
fn calc_restrict_time(time: u64, unit: UnitOfTime) -> Duration {
match unit {
UnitOfTime::Hours => time * 3600,
UnitOfTime::Minutes => time * 60,
UnitOfTime::Seconds => time,
UnitOfTime::Hours => Duration::hours(time as i64),
UnitOfTime::Minutes => Duration::minutes(time as i64),
UnitOfTime::Seconds => Duration::seconds(time as i64),
}
}
type Cx = UpdateWithCx<AutoSend<Bot>, Message>;
// Mute a user with a replied message.
async fn mute_user(cx: &Cx, time: u32) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn mute_user(cx: &Cx, time: Duration) -> Result<(), Box<dyn Error + Send + Sync>> {
match cx.update.reply_to_message() {
Some(msg1) => {
cx.requester
@ -74,7 +73,12 @@ async fn mute_user(cx: &Cx, time: u32) -> Result<(), Box<dyn Error + Send + Sync
msg1.from().expect("Must be MessageKind::Common").id,
ChatPermissions::default(),
)
.until_date((cx.update.date + time as i32).try_into().unwrap())
.until_date(
DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp(cx.update.date as i64, 0),
Utc,
) + time,
)
.await?;
}
None => {
@ -102,7 +106,7 @@ async fn kick_user(cx: &Cx) -> Result<(), Box<dyn Error + Send + Sync>> {
}
// Ban a user with replied message.
async fn ban_user(cx: &Cx, time: u32) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn ban_user(cx: &Cx, time: Duration) -> Result<(), Box<dyn Error + Send + Sync>> {
match cx.update.reply_to_message() {
Some(message) => {
cx.requester
@ -110,7 +114,12 @@ async fn ban_user(cx: &Cx, time: u32) -> Result<(), Box<dyn Error + Send + Sync>
cx.update.chat_id(),
message.from().expect("Must be MessageKind::Common").id,
)
.until_date((cx.update.date + time as i32).try_into().unwrap())
.until_date(
DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp(cx.update.date as i64, 0),
Utc,
) + time,
)
.await?;
}
None => {
@ -142,6 +151,7 @@ async fn run() {
let bot = Bot::from_env().auto_send();
let bot_name: String = panic!("Your bot's name here");
let Me { user: bot_user, .. } = bot.get_me().await.unwrap();
let bot_name = bot_user.username.expect("Bots must have usernames");
teloxide::commands_repl(bot, bot_name, action).await;
}

View file

@ -16,8 +16,8 @@ log = "0.4.8"
pretty_env_logger = "0.4.0"
derive_more = "0.99.9"
frunk = "0.3.1"
frunk_core = "0.3.1"
frunk = "0.4"
frunk_core = "0.4"
[profile.release]
lto = true

View file

@ -6,7 +6,7 @@ use crate::dialogue::states::{
use derive_more::From;
use teloxide::macros::Transition;
#[derive(Transition, From)]
#[derive(Transition, Clone, From)]
pub enum Dialogue {
Start(StartState),
ReceiveFullName(ReceiveFullNameState),

View file

@ -1,7 +1,7 @@
use crate::dialogue::{states::receive_location::ReceiveLocationState, Dialogue};
use teloxide::prelude::*;
#[derive(Generic)]
#[derive(Clone, Generic)]
pub struct ReceiveAgeState {
pub full_name: String,
}

View file

@ -1,7 +1,7 @@
use crate::dialogue::{states::receive_age::ReceiveAgeState, Dialogue};
use teloxide::prelude::*;
#[derive(Generic)]
#[derive(Clone, Generic)]
pub struct ReceiveFullNameState;
#[teloxide(subtransition)]

View file

@ -1,7 +1,7 @@
use crate::dialogue::Dialogue;
use teloxide::prelude::*;
#[derive(Generic)]
#[derive(Clone, Generic)]
pub struct ReceiveLocationState {
pub full_name: String,
pub age: u8,

View file

@ -1,6 +1,7 @@
use crate::dialogue::{states::ReceiveFullNameState, Dialogue};
use teloxide::prelude::*;
#[derive(Clone)]
pub struct StartState;
#[teloxide(subtransition)]

View file

@ -1,14 +1,14 @@
// The version of Heroku ping-pong-bot, which uses a webhook to receive updates
// from Telegram, instead of long polling.
use teloxide::{dispatching::update_listeners, prelude::*, types::Update};
use teloxide::{dispatching::{update_listeners::{self, StatefulListener}, stop_token::AsyncStopToken}, prelude::*, types::Update};
use std::{convert::Infallible, env, net::SocketAddr};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use warp::Filter;
use reqwest::StatusCode;
use reqwest::{StatusCode, Url};
#[tokio::main]
async fn main() {
@ -20,8 +20,8 @@ async fn handle_rejection(error: warp::Rejection) -> Result<impl warp::Reply, In
Ok(StatusCode::INTERNAL_SERVER_ERROR)
}
pub async fn webhook<'a>(bot: AutoSend<Bot>) -> impl update_listeners::UpdateListener<Infallible> {
// Heroku defines auto defines a port value
pub async fn webhook(bot: AutoSend<Bot>) -> impl update_listeners::UpdateListener<Infallible> {
// Heroku auto defines a port value
let teloxide_token = env::var("TELOXIDE_TOKEN").expect("TELOXIDE_TOKEN env variable missing");
let port: u16 = env::var("PORT")
.expect("PORT env variable missing")
@ -30,7 +30,7 @@ pub async fn webhook<'a>(bot: AutoSend<Bot>) -> impl update_listeners::UpdateLis
// Heroku host example .: "heroku-ping-pong-bot.herokuapp.com"
let host = env::var("HOST").expect("have HOST env variable");
let path = format!("bot{}", teloxide_token);
let url = format!("https://{}/{}", host, path);
let url = Url::parse(&format!("https://{}/{}", host, path)).unwrap();
bot.set_webhook(url).await.expect("Cannot setup a webhook");
@ -48,11 +48,21 @@ pub async fn webhook<'a>(bot: AutoSend<Bot>) -> impl update_listeners::UpdateLis
})
.recover(handle_rejection);
let serve = warp::serve(server);
let (stop_token, stop_flag) = AsyncStopToken::new_pair();
let address = format!("0.0.0.0:{}", port);
tokio::spawn(serve.run(address.parse::<SocketAddr>().unwrap()));
UnboundedReceiverStream::new(rx)
let addr = format!("0.0.0.0:{}", port).parse::<SocketAddr>().unwrap();
let server = warp::serve(server);
let (_addr, fut) = server.bind_with_graceful_shutdown(addr, stop_flag);
// You might want to use serve.key_path/serve.cert_path methods here to
// setup a self-signed TLS certificate.
tokio::spawn(fut);
let stream = UnboundedReceiverStream::new(rx);
fn streamf<S, T>(state: &mut (S, T)) -> &mut S { &mut state.0 }
StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| state.1.clone())
}
async fn run() {

View file

@ -1,14 +1,14 @@
// The version of ngrok ping-pong-bot, which uses a webhook to receive updates
// from Telegram, instead of long polling.
use teloxide::{dispatching::update_listeners, prelude::*, types::Update};
use teloxide::{dispatching::{update_listeners::{self, StatefulListener}, stop_token::AsyncStopToken}, prelude::*, types::Update};
use std::{convert::Infallible, net::SocketAddr};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use warp::Filter;
use reqwest::StatusCode;
use reqwest::{StatusCode, Url};
#[tokio::main]
async fn main() {
@ -20,10 +20,12 @@ async fn handle_rejection(error: warp::Rejection) -> Result<impl warp::Reply, In
Ok(StatusCode::INTERNAL_SERVER_ERROR)
}
pub async fn webhook<'a>(bot: AutoSend<Bot>) -> impl update_listeners::UpdateListener<Infallible> {
pub async fn webhook(bot: AutoSend<Bot>) -> impl update_listeners::UpdateListener<Infallible> {
let url = Url::parse("Your HTTPS ngrok URL here. Get it by `ngrok http 80`").unwrap();
// You might want to specify a self-signed certificate via .certificate
// method on SetWebhook.
bot.set_webhook("Your HTTPS ngrok URL here. Get it by 'ngrok http 80'")
bot.set_webhook(url)
.await
.expect("Cannot setup a webhook");
@ -40,13 +42,21 @@ pub async fn webhook<'a>(bot: AutoSend<Bot>) -> impl update_listeners::UpdateLis
})
.recover(handle_rejection);
let serve = warp::serve(server);
let (stop_token, stop_flag) = AsyncStopToken::new_pair();
let addr = "127.0.0.1:80".parse::<SocketAddr>().unwrap();
let server = warp::serve(server);
let (_addr, fut) = server.bind_with_graceful_shutdown(addr, stop_flag);
// You might want to use serve.key_path/serve.cert_path methods here to
// setup a self-signed TLS certificate.
tokio::spawn(serve.run("127.0.0.1:80".parse::<SocketAddr>().unwrap()));
UnboundedReceiverStream::new(rx)
tokio::spawn(fut);
let stream = UnboundedReceiverStream::new(rx);
fn streamf<S, T>(state: &mut (S, T)) -> &mut S { &mut state.0 }
StatefulListener::new((stream, stop_token), streamf, |state: &mut (_, AsyncStopToken)| state.1.clone())
}
async fn run() {

View file

@ -18,7 +18,7 @@ async fn answer(
command: Command,
) -> Result<(), Box<dyn Error + Send + Sync>> {
match command {
Command::Help => cx.answer(Command::descriptions()).send().await?,
Command::Help => cx.answer(Command::descriptions()).await?,
Command::Username(username) => {
cx.answer(format!("Your username is @{}.", username)).await?
}

View file

@ -1,6 +1,6 @@
[build]
command = "rustup install nightly --profile minimal && cargo +nightly doc --all-features --no-deps && cp -r target/doc _netlify_out"
environment = { RUSTDOCFLAGS= "--cfg docsrs" }
environment = { RUSTFLAGS="--cfg dep_docsrs", RUSTDOCFLAGS= "--cfg docsrs -Znormalize-docs" }
publish = "_netlify_out"
[[redirects]]

View file

@ -4,12 +4,13 @@ use crate::dispatching::{
},
DispatcherHandler, UpdateWithCx,
};
use std::{convert::Infallible, marker::PhantomData};
use std::{fmt::Debug, marker::PhantomData};
use futures::{future::BoxFuture, FutureExt, StreamExt};
use tokio::sync::mpsc;
use lockfree::map::Map;
use crate::dispatching::dialogue::InMemStorageError;
use flurry::HashMap;
use std::sync::{Arc, Mutex};
use teloxide_core::requests::Requester;
use tokio_stream::wrappers::UnboundedReceiverStream;
@ -19,6 +20,11 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
/// Note that it implements [`DispatcherHandler`], so you can just put an
/// instance of this dispatcher into the [`Dispatcher`]'s methods.
///
/// Note that when the storage methods [`Storage::remove_dialogue`] and
/// [`Storage::update_dialogue`] are failed, the errors are logged, but a result
/// from [`Storage::get_dialogue`] is provided to a user handler as-is so you
/// can respond to a concrete user with an error description.
///
/// See the [module-level documentation](crate::dispatching::dialogue) for the
/// design overview.
///
@ -35,12 +41,12 @@ pub struct DialogueDispatcher<R, D, S, H, Upd> {
/// A value is the TX part of an unbounded asynchronous MPSC channel. A
/// handler that executes updates from the same chat ID sequentially
/// handles the RX part.
senders: Arc<Map<i64, mpsc::UnboundedSender<UpdateWithCx<R, Upd>>>>,
senders: Arc<HashMap<i64, mpsc::UnboundedSender<UpdateWithCx<R, Upd>>>>,
}
impl<R, D, H, Upd> DialogueDispatcher<R, D, InMemStorage<D>, H, Upd>
where
H: DialogueDispatcherHandler<R, Upd, D, Infallible> + Send + Sync + 'static,
H: DialogueDispatcherHandler<R, Upd, D, InMemStorageError> + Send + Sync + 'static,
Upd: GetChatId + Send + 'static,
D: Default + Send + 'static,
{
@ -53,7 +59,7 @@ where
Self {
storage: InMemStorage::new(),
handler: Arc::new(handler),
senders: Arc::new(Map::new()),
senders: Arc::new(HashMap::new()),
_phantom: PhantomData,
}
}
@ -65,7 +71,7 @@ where
Upd: GetChatId + Send + 'static,
D: Default + Send + 'static,
S: Storage<D> + Send + Sync + 'static,
S::Error: Send + 'static,
S::Error: Debug + Send + 'static,
{
/// Creates a dispatcher with the specified `handler` and `storage`.
#[must_use]
@ -73,7 +79,7 @@ where
Self {
storage,
handler: Arc::new(handler),
senders: Arc::new(Map::new()),
senders: Arc::new(HashMap::new()),
_phantom: PhantomData,
}
}
@ -97,28 +103,24 @@ where
async move {
let chat_id = cx.update.chat_id();
let dialogue = Arc::clone(&storage)
.remove_dialogue(chat_id)
.await
.map(Option::unwrap_or_default);
let dialogue =
Arc::clone(&storage).get_dialogue(chat_id).await.map(Option::unwrap_or_default);
match handler.handle(DialogueWithCx { cx, dialogue }).await {
DialogueStage::Next(new_dialogue) => {
if let Ok(Some(_)) = storage.update_dialogue(chat_id, new_dialogue).await {
panic!(
"Oops, you have an bug in your Storage: update_dialogue returns \
Some after remove_dialogue"
);
if let Err(e) = storage.update_dialogue(chat_id, new_dialogue).await {
log::error!("Storage::update_dialogue failed: {:?}", e);
}
}
DialogueStage::Exit => {
// On the next .poll() call, the spawned future will
// return Poll::Ready, because we are dropping the
// sender right here:
senders.remove(&chat_id);
senders.pin().remove(&chat_id);
// We already removed a dialogue from `storage` (see
// the beginning of this async block).
if let Err(e) = storage.remove_dialogue(chat_id).await {
log::error!("Storage::remove_dialogue failed: {:?}", e);
}
}
}
}
@ -134,7 +136,7 @@ where
Upd: GetChatId + Send + 'static,
D: Default + Send + 'static,
S: Storage<D> + Send + Sync + 'static,
S::Error: Send + 'static,
S::Error: Debug + Send + 'static,
R: Requester + Send,
{
fn handle(
@ -151,10 +153,10 @@ where
let this = Arc::clone(&this);
let chat_id = cx.update.chat_id();
match this.senders.get(&chat_id) {
match this.senders.pin().get(&chat_id) {
// An old dialogue
Some(tx) => {
if tx.1.send(cx).is_err() {
if tx.send(cx).is_err() {
panic!("We are not dropping a receiver or call .close() on it",);
}
}
@ -163,7 +165,7 @@ where
if tx.send(cx).is_err() {
panic!("We are not dropping a receiver or call .close() on it",);
}
this.senders.insert(chat_id, tx);
this.senders.pin().insert(chat_id, tx);
}
}
@ -213,7 +215,7 @@ mod tests {
}
let dispatcher = DialogueDispatcher::new(
|cx: DialogueWithCx<Bot, MyUpdate, (), Infallible>| async move {
|cx: DialogueWithCx<Bot, MyUpdate, (), InMemStorageError>| async move {
tokio::time::sleep(Duration::from_millis(300)).await;
match cx.cx.update {

View file

@ -33,10 +33,17 @@
//! # #[cfg(feature = "macros")] {
//! use std::convert::Infallible;
//!
//! use teloxide::{dispatching::dialogue::Transition, prelude::*, teloxide, RequestError};
//! use teloxide::{
//! dispatching::dialogue::{InMemStorageError, Transition},
//! prelude::*,
//! teloxide, RequestError,
//! };
//!
//! #[derive(Clone)]
//! struct _1State;
//! #[derive(Clone)]
//! struct _2State;
//! #[derive(Clone)]
//! struct _3State;
//!
//! type Out = TransitionOut<D, RequestError>;
@ -56,7 +63,7 @@
//! todo!()
//! }
//!
//! #[derive(Transition)]
//! #[derive(Clone, Transition)]
//! enum D {
//! _1(_1State),
//! _2(_2State),
@ -69,7 +76,7 @@
//! }
//! }
//!
//! type In = DialogueWithCx<AutoSend<Bot>, Message, D, Infallible>;
//! type In = DialogueWithCx<AutoSend<Bot>, Message, D, InMemStorageError>;
//!
//! #[tokio::main]
//! async fn main() {
@ -168,4 +175,4 @@ pub use storage::{RedisStorage, RedisStorageError};
#[cfg(feature = "sqlite-storage")]
pub use storage::{SqliteStorage, SqliteStorageError};
pub use storage::{serializer, InMemStorage, Serializer, Storage, TraceStorage};
pub use storage::{serializer, InMemStorage, InMemStorageError, Serializer, Storage, TraceStorage};

View file

@ -1,18 +1,23 @@
use super::Storage;
use futures::future::BoxFuture;
use std::{collections::HashMap, sync::Arc};
use thiserror::Error;
use tokio::sync::Mutex;
/// A memory storage based on a hash map. Stores all the dialogues directly in
/// RAM.
/// An error returned from [`InMemStorage`].
#[derive(Debug, Error)]
pub enum InMemStorageError {
/// Returned from [`InMemStorage::remove_dialogue`].
#[error("row not found")]
DialogueNotFound,
}
/// A dialogue storage based on [`std::collections::HashMap`].
///
/// ## Note
/// All the dialogues will be lost after you restart your bot. If you need to
/// store them somewhere on a drive, you should use [`SqliteStorage`],
/// [`RedisStorage`] or implement your own.
///
/// [`RedisStorage`]: crate::dispatching::dialogue::RedisStorage
/// [`SqliteStorage`]: crate::dispatching::dialogue::SqliteStorage
/// All your dialogues will be lost after you restart your bot. If you need to
/// store them somewhere on a drive, you should use e.g.
/// [`super::SqliteStorage`] or implement your own.
#[derive(Debug)]
pub struct InMemStorage<D> {
map: Mutex<HashMap<i64, D>>,
@ -25,27 +30,44 @@ impl<S> InMemStorage<S> {
}
}
impl<D> Storage<D> for InMemStorage<D> {
type Error = std::convert::Infallible;
impl<D> Storage<D> for InMemStorage<D>
where
D: Clone,
D: Send + 'static,
{
type Error = InMemStorageError;
fn remove_dialogue(
self: Arc<Self>,
chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>>
fn remove_dialogue(self: Arc<Self>, chat_id: i64) -> BoxFuture<'static, Result<(), Self::Error>>
where
D: Send + 'static,
{
Box::pin(async move { Ok(self.map.lock().await.remove(&chat_id)) })
Box::pin(async move {
self.map
.lock()
.await
.remove(&chat_id)
.map_or(Err(InMemStorageError::DialogueNotFound), |_| Ok(()))
})
}
fn update_dialogue(
self: Arc<Self>,
chat_id: i64,
dialogue: D,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>>
) -> BoxFuture<'static, Result<(), Self::Error>>
where
D: Send + 'static,
{
Box::pin(async move { Ok(self.map.lock().await.insert(chat_id, dialogue)) })
Box::pin(async move {
self.map.lock().await.insert(chat_id, dialogue);
Ok(())
})
}
fn get_dialogue(
self: Arc<Self>,
chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
Box::pin(async move { Ok(self.map.lock().await.get(&chat_id).map(ToOwned::to_owned)) })
}
}

View file

@ -11,7 +11,10 @@ mod sqlite_storage;
use futures::future::BoxFuture;
pub use self::{in_mem_storage::InMemStorage, trace_storage::TraceStorage};
pub use self::{
in_mem_storage::{InMemStorage, InMemStorageError},
trace_storage::TraceStorage,
};
#[cfg(feature = "redis-storage")]
#[cfg_attr(all(docsrs, feature = "nightly"), doc(cfg(feature = "redis-storage")))]
@ -27,11 +30,14 @@ pub use sqlite_storage::{SqliteStorage, SqliteStorageError};
/// You can implement this trait for a structure that communicates with a DB and
/// be sure that after you restart your bot, all the dialogues won't be lost.
///
/// `Storage` is used only to store dialogue states, i.e. it can't be used as a
/// generic database.
///
/// Currently we support the following storages out of the box:
///
/// - [`InMemStorage`] - a storage based on a simple hash map
/// - [`RedisStorage`] - a Redis-based storage
/// - [`SqliteStorage`] - an SQLite-based persistent storage
/// - [`InMemStorage`] -- a storage based on [`std::collections::HashMap`].
/// - [`RedisStorage`] -- a Redis-based storage.
/// - [`SqliteStorage`] -- an SQLite-based persistent storage.
///
/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage
/// [`RedisStorage`]: crate::dispatching::dialogue::RedisStorage
@ -39,26 +45,32 @@ pub use sqlite_storage::{SqliteStorage, SqliteStorageError};
pub trait Storage<D> {
type Error;
/// Removes a dialogue with the specified `chat_id`.
/// Removes a dialogue indexed by `chat_id`.
///
/// Returns `None` if there wasn't such a dialogue, `Some(dialogue)` if a
/// `dialogue` was deleted.
/// If the dialogue indexed by `chat_id` does not exist, this function
/// results in an error.
#[must_use = "Futures are lazy and do nothing unless polled with .await"]
fn remove_dialogue(
self: Arc<Self>,
chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>>
) -> BoxFuture<'static, Result<(), Self::Error>>
where
D: Send + 'static;
/// Updates a dialogue with the specified `chat_id`.
///
/// Returns `None` if there wasn't such a dialogue, `Some(dialogue)` if a
/// `dialogue` was updated.
/// Updates a dialogue indexed by `chat_id` with `dialogue`.
#[must_use = "Futures are lazy and do nothing unless polled with .await"]
fn update_dialogue(
self: Arc<Self>,
chat_id: i64,
dialogue: D,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>>
) -> BoxFuture<'static, Result<(), Self::Error>>
where
D: Send + 'static;
/// Returns the dialogue indexed by `chat_id`.
#[must_use = "Futures are lazy and do nothing unless polled with .await"]
fn get_dialogue(
self: Arc<Self>,
chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>>;
}

View file

@ -1,6 +1,6 @@
use super::{serializer::Serializer, Storage};
use futures::future::BoxFuture;
use redis::{AsyncCommands, FromRedisValue, IntoConnectionInfo};
use redis::{AsyncCommands, IntoConnectionInfo};
use serde::{de::DeserializeOwned, Serialize};
use std::{
convert::Infallible,
@ -12,8 +12,6 @@ use thiserror::Error;
use tokio::sync::Mutex;
/// An error returned from [`RedisStorage`].
///
/// [`RedisStorage`]: struct.RedisStorage.html
#[derive(Debug, Error)]
pub enum RedisStorageError<SE>
where
@ -21,11 +19,16 @@ where
{
#[error("parsing/serializing error: {0}")]
SerdeError(SE),
#[error("error from Redis: {0}")]
RedisError(#[from] redis::RedisError),
/// Returned from [`RedisStorage::remove_dialogue`].
#[error("row not found")]
DialogueNotFound,
}
/// A memory storage based on [Redis](https://redis.io/).
/// A dialogue storage based on [Redis](https://redis.io/).
pub struct RedisStorage<S> {
conn: Mutex<redis::aio::Connection>,
serializer: S,
@ -51,35 +54,27 @@ where
{
type Error = RedisStorageError<<S as Serializer<D>>::Error>;
// `.del().ignore()` is much more readable than `.del()\n.ignore()`
#[rustfmt::skip]
fn remove_dialogue(
self: Arc<Self>,
chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
) -> BoxFuture<'static, Result<(), Self::Error>> {
Box::pin(async move {
let res = redis::pipe()
let deleted_rows_count = redis::pipe()
.atomic()
.get(chat_id)
.del(chat_id).ignore()
.query_async::<_, redis::Value>(
self.conn.lock().await.deref_mut(),
)
.del(chat_id)
.query_async::<_, redis::Value>(self.conn.lock().await.deref_mut())
.await?;
// We're expecting `.pipe()` to return us an exactly one result in
// bulk, so all other branches should be unreachable
match res {
redis::Value::Bulk(bulk) if bulk.len() == 1 => {
Ok(Option::<Vec<u8>>::from_redis_value(&bulk[0])?
.map(|v| {
self.serializer
.deserialize(&v)
.map_err(RedisStorageError::SerdeError)
})
.transpose()?)
if let redis::Value::Bulk(values) = deleted_rows_count {
if let redis::Value::Int(deleted_rows_count) = values[0] {
match deleted_rows_count {
0 => return Err(RedisStorageError::DialogueNotFound),
_ => return Ok(()),
}
}
_ => unreachable!(),
}
unreachable!("Must return redis::Value::Bulk(redis::Value::Int(_))");
})
}
@ -87,14 +82,24 @@ where
self: Arc<Self>,
chat_id: i64,
dialogue: D,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
) -> BoxFuture<'static, Result<(), Self::Error>> {
Box::pin(async move {
let dialogue =
self.serializer.serialize(&dialogue).map_err(RedisStorageError::SerdeError)?;
self.conn.lock().await.set::<_, Vec<u8>, _>(chat_id, dialogue).await?;
Ok(())
})
}
fn get_dialogue(
self: Arc<Self>,
chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
Box::pin(async move {
self.conn
.lock()
.await
.getset::<_, Vec<u8>, Option<Vec<u8>>>(chat_id, dialogue)
.get::<_, Option<Vec<u8>>>(chat_id)
.await?
.map(|d| self.serializer.deserialize(&d).map_err(RedisStorageError::SerdeError))
.transpose()

View file

@ -1,4 +1,4 @@
//! Various serializers for memory storages.
//! Various serializers for dialogue storages.
use serde::{de::DeserializeOwned, ser::Serialize};

View file

@ -10,15 +10,13 @@ use std::{
};
use thiserror::Error;
/// A persistent storage based on [SQLite](https://www.sqlite.org/).
/// A persistent dialogue storage based on [SQLite](https://www.sqlite.org/).
pub struct SqliteStorage<S> {
pool: SqlitePool,
serializer: S,
}
/// An error returned from [`SqliteStorage`].
///
/// [`SqliteStorage`]: struct.SqliteStorage.html
#[derive(Debug, Error)]
pub enum SqliteStorageError<SE>
where
@ -26,8 +24,13 @@ where
{
#[error("dialogue serialization error: {0}")]
SerdeError(SE),
#[error("sqlite error: {0}")]
SqliteError(#[from] sqlx::Error),
/// Returned from [`SqliteStorage::remove_dialogue`].
#[error("row not found")]
DialogueNotFound,
}
impl<S> SqliteStorage<S> {
@ -60,23 +63,24 @@ where
{
type Error = SqliteStorageError<<S as Serializer<D>>::Error>;
/// Returns [`sqlx::Error::RowNotFound`] if a dialogue does not exist.
fn remove_dialogue(
self: Arc<Self>,
chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
) -> BoxFuture<'static, Result<(), Self::Error>> {
Box::pin(async move {
Ok(match get_dialogue(&self.pool, chat_id).await? {
Some(d) => {
let prev_dialogue =
self.serializer.deserialize(&d).map_err(SqliteStorageError::SerdeError)?;
sqlx::query("DELETE FROM teloxide_dialogues WHERE chat_id = ?")
.bind(chat_id)
.execute(&self.pool)
.await?;
Some(prev_dialogue)
}
_ => None,
})
let deleted_rows_count =
sqlx::query("DELETE FROM teloxide_dialogues WHERE chat_id = ?")
.bind(chat_id)
.execute(&self.pool)
.await?
.rows_affected();
if deleted_rows_count == 0 {
return Err(SqliteStorageError::DialogueNotFound);
}
Ok(())
})
}
@ -84,14 +88,10 @@ where
self: Arc<Self>,
chat_id: i64,
dialogue: D,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
) -> BoxFuture<'static, Result<(), Self::Error>> {
Box::pin(async move {
let prev_dialogue = get_dialogue(&self.pool, chat_id)
.await?
.map(|d| self.serializer.deserialize(&d).map_err(SqliteStorageError::SerdeError))
.transpose()?;
let upd_dialogue =
self.serializer.serialize(&dialogue).map_err(SqliteStorageError::SerdeError)?;
let d = self.serializer.serialize(&dialogue).map_err(SqliteStorageError::SerdeError)?;
self.pool
.acquire()
.await?
@ -103,28 +103,39 @@ where
"#,
)
.bind(chat_id)
.bind(upd_dialogue),
.bind(d),
)
.await?;
Ok(prev_dialogue)
Ok(())
})
}
fn get_dialogue(
self: Arc<Self>,
chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
Box::pin(async move {
get_dialogue(&self.pool, chat_id)
.await?
.map(|d| self.serializer.deserialize(&d).map_err(SqliteStorageError::SerdeError))
.transpose()
})
}
}
#[derive(sqlx::FromRow)]
struct DialogueDbRow {
dialogue: Vec<u8>,
}
async fn get_dialogue(pool: &SqlitePool, chat_id: i64) -> Result<Option<Vec<u8>>, sqlx::Error> {
#[derive(sqlx::FromRow)]
struct DialogueDbRow {
dialogue: Vec<u8>,
}
async fn get_dialogue(
pool: &SqlitePool,
chat_id: i64,
) -> Result<Option<Box<Vec<u8>>>, sqlx::Error> {
Ok(sqlx::query_as::<_, DialogueDbRow>(
let bytes = sqlx::query_as::<_, DialogueDbRow>(
"SELECT dialogue FROM teloxide_dialogues WHERE chat_id = ?",
)
.bind(chat_id)
.fetch_optional(pool)
.await?
.map(|r| Box::new(r.dialogue)))
.map(|r| r.dialogue);
Ok(bytes)
}

View file

@ -5,14 +5,13 @@ use std::{
};
use futures::future::BoxFuture;
use log::{log_enabled, trace, Level::Trace};
use crate::dispatching::dialogue::Storage;
/// Storage wrapper for logging purposes
/// A dialogue storage wrapper which logs all actions performed on an underlying
/// storage.
///
/// Reports about any dialogue update or removal action on `trace` level
/// using `log` crate.
/// Reports about any dialogue action via [`log::Level::Trace`].
pub struct TraceStorage<S> {
inner: Arc<S>,
}
@ -35,14 +34,11 @@ where
{
type Error = <S as Storage<D>>::Error;
fn remove_dialogue(
self: Arc<Self>,
chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>>
fn remove_dialogue(self: Arc<Self>, chat_id: i64) -> BoxFuture<'static, Result<(), Self::Error>>
where
D: Send + 'static,
{
trace!("Removing dialogue with {}", chat_id);
log::trace!("Removing dialogue #{}", chat_id);
<S as Storage<D>>::remove_dialogue(self.inner.clone(), chat_id)
}
@ -50,21 +46,23 @@ where
self: Arc<Self>,
chat_id: i64,
dialogue: D,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>>
) -> BoxFuture<'static, Result<(), Self::Error>>
where
D: Send + 'static,
{
if log_enabled!(Trace) {
Box::pin(async move {
let to = format!("{:#?}", dialogue);
let from =
<S as Storage<D>>::update_dialogue(self.inner.clone(), chat_id, dialogue)
.await?;
trace!("Updated dialogue with {}, {:#?} -> {}", chat_id, from, to);
Ok(from)
})
} else {
<S as Storage<D>>::update_dialogue(self.inner.clone(), chat_id, dialogue)
}
Box::pin(async move {
let to = format!("{:#?}", dialogue);
<S as Storage<D>>::update_dialogue(self.inner.clone(), chat_id, dialogue).await?;
log::trace!("Updated a dialogue #{}: {:#?}", chat_id, to);
Ok(())
})
}
fn get_dialogue(
self: Arc<Self>,
chat_id: i64,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
log::trace!("Requested a dialogue #{}", chat_id);
<S as Storage<D>>::get_dialogue(self.inner.clone(), chat_id)
}
}

View file

@ -1,48 +1,37 @@
use std::{
fmt::{self, Debug},
sync::{
atomic::{AtomicU8, Ordering},
Arc,
},
time::Duration,
};
use crate::{
dispatching::{
update_listeners, update_listeners::UpdateListener, DispatcherHandler, UpdateWithCx,
stop_token::StopToken,
update_listeners::{self, UpdateListener},
DispatcherHandler, UpdateWithCx,
},
error_handlers::{ErrorHandler, LoggingErrorHandler},
};
use futures::StreamExt;
use std::{fmt::Debug, sync::Arc};
use futures::{stream::FuturesUnordered, Future, StreamExt};
use teloxide_core::{
requests::Requester,
types::{
CallbackQuery, ChatMemberUpdated, ChosenInlineResult, InlineQuery, Message, Poll,
PollAnswer, PreCheckoutQuery, ShippingQuery, UpdateKind,
AllowedUpdate, CallbackQuery, ChatMemberUpdated, ChosenInlineResult, InlineQuery, Message,
Poll, PollAnswer, PreCheckoutQuery, ShippingQuery, Update, UpdateKind,
},
};
use tokio::sync::mpsc;
use tokio::{
sync::{mpsc, Notify},
task::JoinHandle,
time::timeout,
};
type Tx<Upd, R> = Option<mpsc::UnboundedSender<UpdateWithCx<Upd, R>>>;
#[macro_use]
mod macros {
/// Pushes an update to a queue.
macro_rules! send {
($requester:expr, $tx:expr, $update:expr, $variant:expr) => {
send($requester, $tx, $update, stringify!($variant));
};
}
}
fn send<'a, R, Upd>(requester: &'a R, tx: &'a Tx<R, Upd>, update: Upd, variant: &'static str)
where
Upd: Debug,
R: Requester + Clone,
{
if let Some(tx) = tx {
if let Err(error) = tx.send(UpdateWithCx { requester: requester.clone(), update }) {
log::error!(
"The RX part of the {} channel is closed, but an update is received.\nError:{}\n",
variant,
error
);
}
}
}
/// One dispatcher to rule them all.
///
/// See the [module-level documentation](crate::dispatching) for the design
@ -63,6 +52,11 @@ pub struct Dispatcher<R> {
poll_answers_queue: Tx<R, PollAnswer>,
my_chat_members_queue: Tx<R, ChatMemberUpdated>,
chat_members_queue: Tx<R, ChatMemberUpdated>,
running_handlers: FuturesUnordered<JoinHandle<()>>,
state: Arc<DispatcherState>,
shutdown_notify_back: Arc<Notify>,
}
impl<R> Dispatcher<R>
@ -87,25 +81,48 @@ where
poll_answers_queue: None,
my_chat_members_queue: None,
chat_members_queue: None,
running_handlers: FuturesUnordered::new(),
state: <_>::default(),
shutdown_notify_back: <_>::default(),
}
}
#[must_use]
#[allow(clippy::unnecessary_wraps)]
fn new_tx<H, Upd>(&self, h: H) -> Tx<R, Upd>
fn new_tx<H, Upd>(&mut self, h: H) -> Tx<R, Upd>
where
H: DispatcherHandler<R, Upd> + Send + 'static,
Upd: Send + 'static,
R: Send + 'static,
{
let (tx, rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
let fut = h.handle(rx);
fut.await;
});
let join_handle = tokio::spawn(h.handle(rx));
self.running_handlers.push(join_handle);
Some(tx)
}
/// Setup the `^C` handler which [`shutdown`]s dispatching.
///
/// [`shutdown`]: ShutdownToken::shutdown
#[cfg(feature = "ctrlc_handler")]
#[cfg_attr(docsrs, doc(cfg(feature = "ctrlc_handler")))]
pub fn setup_ctrlc_handler(self) -> Self {
let state = Arc::clone(&self.state);
tokio::spawn(async move {
loop {
tokio::signal::ctrl_c().await.expect("Failed to listen for ^C");
log::info!("^C received, trying to shutdown the dispatcher...");
// If dispatcher wasn't running, then there is nothing to do
shutdown_inner(&state).ok();
}
});
self
}
#[must_use]
pub fn messages_handler<H>(mut self, h: H) -> Self
where
@ -227,23 +244,39 @@ where
///
/// The default parameters are a long polling update listener and log all
/// errors produced by this listener).
pub async fn dispatch(&self)
///
/// Please note that after shutting down (either because of [`shutdown`],
/// [a ctrlc signal], or [`UpdateListener`] returning `None`) all handlers
/// will be gone. As such, to restart listening you need to re-add
/// handlers.
///
/// [`shutdown`]: ShutdownToken::shutdown
/// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler
pub async fn dispatch(&mut self)
where
R: Requester + Clone,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
self.dispatch_with_listener(
update_listeners::polling_default(self.requester.clone()),
LoggingErrorHandler::with_custom_text("An error from the update listener"),
)
.await;
let listener = update_listeners::polling_default(self.requester.clone()).await;
let error_handler =
LoggingErrorHandler::with_custom_text("An error from the update listener");
self.dispatch_with_listener(listener, error_handler).await;
}
/// Starts your bot with custom `update_listener` and
/// `update_listener_error_handler`.
///
/// Please note that after shutting down (either because of [`shutdown`],
/// [a ctrlc signal], or [`UpdateListener`] returning `None`) all handlers
/// will be gone. As such, to restart listening you need to re-add
/// handlers.
///
/// [`shutdown`]: ShutdownToken::shutdown
/// [a ctrlc signal]: Dispatcher::setup_ctrlc_handler
pub async fn dispatch_with_listener<'a, UListener, ListenerE, Eh>(
&'a self,
update_listener: UListener,
&'a mut self,
mut update_listener: UListener,
update_listener_error_handler: Arc<Eh>,
) where
UListener: UpdateListener<ListenerE> + 'a,
@ -251,126 +284,365 @@ where
ListenerE: Debug,
R: Requester + Clone,
{
let update_listener = Box::pin(update_listener);
use ShutdownState::*;
update_listener
.for_each(move |update| {
let update_listener_error_handler = Arc::clone(&update_listener_error_handler);
self.hint_allowed_updates(&mut update_listener);
async move {
log::trace!("Dispatcher received an update: {:?}", update);
let shutdown_check_timeout = shutdown_check_timeout_for(&update_listener);
let mut stop_token = Some(update_listener.stop_token());
let update = match update {
Ok(update) => update,
Err(error) => {
Arc::clone(&update_listener_error_handler).handle_error(error).await;
return;
}
};
if let Err(actual) = self.state.compare_exchange(Idle, Running) {
unreachable!(
"Dispatching is already running: expected `{:?}` state, found `{:?}`",
Idle, actual
);
}
match update.kind {
UpdateKind::Message(message) => {
send!(
&self.requester,
&self.messages_queue,
message,
UpdateKind::Message
);
}
UpdateKind::EditedMessage(message) => {
send!(
&self.requester,
&self.edited_messages_queue,
message,
UpdateKind::EditedMessage
);
}
UpdateKind::ChannelPost(post) => {
send!(
&self.requester,
&self.channel_posts_queue,
post,
UpdateKind::ChannelPost
);
}
UpdateKind::EditedChannelPost(post) => {
send!(
&self.requester,
&self.edited_channel_posts_queue,
post,
UpdateKind::EditedChannelPost
);
}
UpdateKind::InlineQuery(query) => {
send!(
&self.requester,
&self.inline_queries_queue,
query,
UpdateKind::InlineQuery
);
}
UpdateKind::ChosenInlineResult(result) => {
send!(
&self.requester,
&self.chosen_inline_results_queue,
result,
UpdateKind::ChosenInlineResult
);
}
UpdateKind::CallbackQuery(query) => {
send!(
&self.requester,
&self.callback_queries_queue,
query,
UpdateKind::CallbackQuer
);
}
UpdateKind::ShippingQuery(query) => {
send!(
&self.requester,
&self.shipping_queries_queue,
query,
UpdateKind::ShippingQuery
);
}
UpdateKind::PreCheckoutQuery(query) => {
send!(
&self.requester,
&self.pre_checkout_queries_queue,
query,
UpdateKind::PreCheckoutQuery
);
}
UpdateKind::Poll(poll) => {
send!(&self.requester, &self.polls_queue, poll, UpdateKind::Poll);
}
UpdateKind::PollAnswer(answer) => {
send!(
&self.requester,
&self.poll_answers_queue,
answer,
UpdateKind::PollAnswer
);
}
UpdateKind::MyChatMember(chat_member_updated) => {
send!(
&self.requester,
&self.my_chat_members_queue,
chat_member_updated,
UpdateKind::MyChatMember
);
}
UpdateKind::ChatMember(chat_member_updated) => {
send!(
&self.requester,
&self.chat_members_queue,
chat_member_updated,
UpdateKind::MyChatMember
);
}
{
let stream = update_listener.as_stream();
tokio::pin!(stream);
loop {
if let Ok(upd) = timeout(shutdown_check_timeout, stream.next()).await {
match upd {
None => break,
Some(upd) => self.process_update(upd, &update_listener_error_handler).await,
}
}
})
.await
if let ShuttingDown = self.state.load() {
if let Some(token) = stop_token.take() {
log::debug!("Start shutting down dispatching...");
token.stop();
}
}
}
}
self.wait_for_handlers().await;
if let ShuttingDown = self.state.load() {
// Stopped because of a `shutdown` call.
// Notify `shutdown`s that we finished
self.shutdown_notify_back.notify_waiters();
log::info!("Dispatching has been shut down.");
} else {
log::info!("Dispatching has been stopped (listener returned `None`).");
}
self.state.store(Idle);
}
/// Returns a shutdown token, which can later be used to shutdown
/// dispatching.
pub fn shutdown_token(&self) -> ShutdownToken {
ShutdownToken {
dispatcher_state: Arc::clone(&self.state),
shutdown_notify_back: Arc::clone(&self.shutdown_notify_back),
}
}
async fn process_update<ListenerE, Eh>(
&self,
update: Result<Update, ListenerE>,
update_listener_error_handler: &Arc<Eh>,
) where
R: Requester + Clone,
Eh: ErrorHandler<ListenerE>,
ListenerE: Debug,
{
{
log::trace!("Dispatcher received an update: {:?}", update);
let update = match update {
Ok(update) => update,
Err(error) => {
Arc::clone(update_listener_error_handler).handle_error(error).await;
return;
}
};
match update.kind {
UpdateKind::Message(message) => {
send(&self.requester, &self.messages_queue, message, "UpdateKind::Message")
}
UpdateKind::EditedMessage(message) => send(
&self.requester,
&self.edited_messages_queue,
message,
"UpdateKind::EditedMessage",
),
UpdateKind::ChannelPost(post) => send(
&self.requester,
&self.channel_posts_queue,
post,
"UpdateKind::ChannelPost",
),
UpdateKind::EditedChannelPost(post) => send(
&self.requester,
&self.edited_channel_posts_queue,
post,
"UpdateKind::EditedChannelPost",
),
UpdateKind::InlineQuery(query) => send(
&self.requester,
&self.inline_queries_queue,
query,
"UpdateKind::InlineQuery",
),
UpdateKind::ChosenInlineResult(result) => send(
&self.requester,
&self.chosen_inline_results_queue,
result,
"UpdateKind::ChosenInlineResult",
),
UpdateKind::CallbackQuery(query) => send(
&self.requester,
&self.callback_queries_queue,
query,
"UpdateKind::CallbackQuer",
),
UpdateKind::ShippingQuery(query) => send(
&self.requester,
&self.shipping_queries_queue,
query,
"UpdateKind::ShippingQuery",
),
UpdateKind::PreCheckoutQuery(query) => send(
&self.requester,
&self.pre_checkout_queries_queue,
query,
"UpdateKind::PreCheckoutQuery",
),
UpdateKind::Poll(poll) => {
send(&self.requester, &self.polls_queue, poll, "UpdateKind::Poll")
}
UpdateKind::PollAnswer(answer) => send(
&self.requester,
&self.poll_answers_queue,
answer,
"UpdateKind::PollAnswer",
),
UpdateKind::MyChatMember(chat_member_updated) => send(
&self.requester,
&self.my_chat_members_queue,
chat_member_updated,
"UpdateKind::MyChatMember",
),
UpdateKind::ChatMember(chat_member_updated) => send(
&self.requester,
&self.chat_members_queue,
chat_member_updated,
"UpdateKind::MyChatMember",
),
}
}
}
fn hint_allowed_updates<E>(&self, listener: &mut impl UpdateListener<E>) {
fn hint_handler_allowed_update<T>(
queue: &Option<T>,
kind: AllowedUpdate,
) -> std::option::IntoIter<AllowedUpdate> {
queue.as_ref().map(|_| kind).into_iter()
}
let mut allowed = hint_handler_allowed_update(&self.messages_queue, AllowedUpdate::Message)
.chain(hint_handler_allowed_update(
&self.edited_messages_queue,
AllowedUpdate::EditedMessage,
))
.chain(hint_handler_allowed_update(
&self.channel_posts_queue,
AllowedUpdate::ChannelPost,
))
.chain(hint_handler_allowed_update(
&self.edited_channel_posts_queue,
AllowedUpdate::EditedChannelPost,
))
.chain(hint_handler_allowed_update(
&self.inline_queries_queue,
AllowedUpdate::InlineQuery,
))
.chain(hint_handler_allowed_update(
&self.chosen_inline_results_queue,
AllowedUpdate::ChosenInlineResult,
))
.chain(hint_handler_allowed_update(
&self.callback_queries_queue,
AllowedUpdate::CallbackQuery,
))
.chain(hint_handler_allowed_update(
&self.shipping_queries_queue,
AllowedUpdate::ShippingQuery,
))
.chain(hint_handler_allowed_update(
&self.pre_checkout_queries_queue,
AllowedUpdate::PreCheckoutQuery,
))
.chain(hint_handler_allowed_update(&self.polls_queue, AllowedUpdate::Poll))
.chain(hint_handler_allowed_update(&self.poll_answers_queue, AllowedUpdate::PollAnswer))
.chain(hint_handler_allowed_update(
&self.my_chat_members_queue,
AllowedUpdate::MyChatMember,
))
.chain(hint_handler_allowed_update(
&self.chat_members_queue,
AllowedUpdate::ChatMember,
));
listener.hint_allowed_updates(&mut allowed);
}
async fn wait_for_handlers(&mut self) {
log::debug!("Waiting for handlers to finish");
// Drop all senders, so handlers can stop
self.messages_queue.take();
self.edited_messages_queue.take();
self.channel_posts_queue.take();
self.edited_channel_posts_queue.take();
self.inline_queries_queue.take();
self.chosen_inline_results_queue.take();
self.callback_queries_queue.take();
self.shipping_queries_queue.take();
self.pre_checkout_queries_queue.take();
self.polls_queue.take();
self.poll_answers_queue.take();
self.my_chat_members_queue.take();
self.chat_members_queue.take();
// Wait untill all handlers finish
self.running_handlers.by_ref().for_each(|_| async {}).await;
}
}
/// This error is returned from [`ShutdownToken::shutdown`] when trying to
/// shutdown an idle [`Dispatcher`].
#[derive(Debug)]
pub struct IdleShutdownError;
impl fmt::Display for IdleShutdownError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Dispatcher was idle and as such couldn't be shut down")
}
}
impl std::error::Error for IdleShutdownError {}
/// A token which used to shutdown [`Dispatcher`].
#[derive(Clone)]
pub struct ShutdownToken {
dispatcher_state: Arc<DispatcherState>,
shutdown_notify_back: Arc<Notify>,
}
impl ShutdownToken {
/// Tries to shutdown dispatching.
///
/// Returns an error if the dispatcher is idle at the moment.
///
/// If you don't need to wait for shutdown, the returned future can be
/// ignored.
pub fn shutdown(&self) -> Result<impl Future<Output = ()> + '_, IdleShutdownError> {
shutdown_inner(&self.dispatcher_state).map(|()| async move {
log::info!("Trying to shutdown the dispatcher...");
self.shutdown_notify_back.notified().await
})
}
}
struct DispatcherState {
inner: AtomicU8,
}
impl DispatcherState {
fn load(&self) -> ShutdownState {
ShutdownState::from_u8(self.inner.load(Ordering::SeqCst))
}
fn store(&self, new: ShutdownState) {
self.inner.store(new as _, Ordering::SeqCst)
}
fn compare_exchange(
&self,
current: ShutdownState,
new: ShutdownState,
) -> Result<ShutdownState, ShutdownState> {
self.inner
.compare_exchange(current as _, new as _, Ordering::SeqCst, Ordering::SeqCst)
.map(ShutdownState::from_u8)
.map_err(ShutdownState::from_u8)
}
}
impl Default for DispatcherState {
fn default() -> Self {
Self { inner: AtomicU8::new(ShutdownState::Idle as _) }
}
}
#[repr(u8)]
#[derive(Debug)]
enum ShutdownState {
Running,
ShuttingDown,
Idle,
}
impl ShutdownState {
fn from_u8(n: u8) -> Self {
const RUNNING: u8 = ShutdownState::Running as u8;
const SHUTTING_DOWN: u8 = ShutdownState::ShuttingDown as u8;
const IDLE: u8 = ShutdownState::Idle as u8;
match n {
RUNNING => ShutdownState::Running,
SHUTTING_DOWN => ShutdownState::ShuttingDown,
IDLE => ShutdownState::Idle,
_ => unreachable!(),
}
}
}
fn shutdown_check_timeout_for<E>(update_listener: &impl UpdateListener<E>) -> Duration {
const MIN_SHUTDOWN_CHECK_TIMEOUT: Duration = Duration::from_secs(1);
// FIXME: replace this by just Duration::ZERO once 1.53 will be released
const DZERO: Duration = Duration::from_secs(0);
let shutdown_check_timeout = update_listener.timeout_hint().unwrap_or(DZERO);
// FIXME: replace this by just saturating_add once 1.53 will be released
shutdown_check_timeout.checked_add(MIN_SHUTDOWN_CHECK_TIMEOUT).unwrap_or(shutdown_check_timeout)
}
fn shutdown_inner(state: &DispatcherState) -> Result<(), IdleShutdownError> {
use ShutdownState::*;
let res = state.compare_exchange(Running, ShuttingDown);
match res {
Ok(_) | Err(ShuttingDown) => Ok(()),
Err(Idle) => Err(IdleShutdownError),
Err(Running) => unreachable!(),
}
}
fn send<'a, R, Upd>(requester: &'a R, tx: &'a Tx<R, Upd>, update: Upd, variant: &'static str)
where
Upd: Debug,
R: Requester + Clone,
{
if let Some(tx) = tx {
if let Err(error) = tx.send(UpdateWithCx { requester: requester.clone(), update }) {
log::error!(
"The RX part of the {} channel is closed, but an update is received.\nError:{}\n",
variant,
error
);
}
}
}

View file

@ -27,7 +27,7 @@
//! that:
//! - You are able to supply [`DialogueDispatcher`] as a handler.
//! - You are able to supply functions that accept
//! [`tokio::sync::mpsc::UnboundedReceiver`] and return `Future<Output = ()`
//! [`tokio::sync::mpsc::UnboundedReceiver`] and return `Future<Output = ()>`
//! as a handler.
//!
//! Since they implement [`DispatcherHandler`] too.
@ -46,14 +46,17 @@
//! [examples/dialogue_bot]: https://github.com/teloxide/teloxide/tree/master/examples/dialogue_bot
pub mod dialogue;
pub mod stop_token;
pub mod update_listeners;
pub(crate) mod repls;
mod dispatcher;
mod dispatcher_handler;
mod dispatcher_handler_rx_ext;
pub(crate) mod repls;
pub mod update_listeners;
mod update_with_cx;
pub use dispatcher::Dispatcher;
pub use dispatcher::{Dispatcher, IdleShutdownError, ShutdownToken};
pub use dispatcher_handler::DispatcherHandler;
pub use dispatcher_handler_rx_ext::DispatcherHandlerRxExt;
use tokio::sync::mpsc::UnboundedReceiver;

View file

@ -22,6 +22,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
///
/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop
/// [`Dispatcher`]: crate::dispatching::Dispatcher
#[cfg(feature = "ctrlc_handler")]
pub async fn commands_repl<R, Cmd, H, Fut, HandlerE, N>(requester: R, bot_name: N, handler: H)
where
Cmd: BotCommand + Send + 'static,
@ -39,7 +40,7 @@ where
requester,
bot_name,
handler,
update_listeners::polling_default(cloned_requester),
update_listeners::polling_default(cloned_requester).await,
)
.await;
}
@ -56,6 +57,7 @@ where
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`commands_repl`]: crate::dispatching::repls::commands_repl()
/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener
#[cfg(feature = "ctrlc_handler")]
pub async fn commands_repl_with_listener<'a, R, Cmd, H, Fut, L, ListenerE, HandlerE, N>(
requester: R,
bot_name: N,
@ -87,6 +89,7 @@ pub async fn commands_repl_with_listener<'a, R, Cmd, H, Fut, L, ListenerE, Handl
},
)
})
.setup_ctrlc_handler()
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),

View file

@ -1,13 +1,13 @@
use crate::{
dispatching::{
dialogue::{DialogueDispatcher, DialogueStage, DialogueWithCx},
dialogue::{DialogueDispatcher, DialogueStage, DialogueWithCx, InMemStorageError},
update_listeners,
update_listeners::UpdateListener,
Dispatcher, UpdateWithCx,
},
error_handlers::LoggingErrorHandler,
};
use std::{convert::Infallible, fmt::Debug, future::Future, sync::Arc};
use std::{fmt::Debug, future::Future, sync::Arc};
use teloxide_core::{requests::Requester, types::Message};
/// A [REPL] for dialogues.
@ -23,10 +23,11 @@ use teloxide_core::{requests::Requester, types::Message};
/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage
#[cfg(feature = "ctrlc_handler")]
pub async fn dialogues_repl<'a, R, H, D, Fut>(requester: R, handler: H)
where
H: Fn(UpdateWithCx<R, Message>, D) -> Fut + Send + Sync + 'static,
D: Default + Send + 'static,
D: Clone + Default + Send + 'static,
Fut: Future<Output = DialogueStage<D>> + Send + 'static,
R: Requester + Send + Clone + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
@ -36,7 +37,7 @@ where
dialogues_repl_with_listener(
requester,
handler,
update_listeners::polling_default(cloned_requester),
update_listeners::polling_default(cloned_requester).await,
)
.await;
}
@ -55,13 +56,14 @@ where
/// [`dialogues_repl`]: crate::dispatching::repls::dialogues_repl()
/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener
/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage
#[cfg(feature = "ctrlc_handler")]
pub async fn dialogues_repl_with_listener<'a, R, H, D, Fut, L, ListenerE>(
requester: R,
handler: H,
listener: L,
) where
H: Fn(UpdateWithCx<R, Message>, D) -> Fut + Send + Sync + 'static,
D: Default + Send + 'static,
D: Clone + Default + Send + 'static,
Fut: Future<Output = DialogueStage<D>> + Send + 'static,
L: UpdateListener<ListenerE> + Send + 'a,
ListenerE: Debug + Send + 'a,
@ -71,7 +73,12 @@ pub async fn dialogues_repl_with_listener<'a, R, H, D, Fut, L, ListenerE>(
Dispatcher::new(requester)
.messages_handler(DialogueDispatcher::new(
move |DialogueWithCx { cx, dialogue }: DialogueWithCx<R, Message, D, Infallible>| {
move |DialogueWithCx { cx, dialogue }: DialogueWithCx<
R,
Message,
D,
InMemStorageError,
>| {
let handler = Arc::clone(&handler);
async move {
@ -80,6 +87,7 @@ pub async fn dialogues_repl_with_listener<'a, R, H, D, Fut, L, ListenerE>(
}
},
))
.setup_ctrlc_handler()
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),

View file

@ -21,6 +21,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
///
/// [REPL]: https://en.wikipedia.org/wiki/Read-eval-print_loop
/// [`Dispatcher`]: crate::dispatching::Dispatcher
#[cfg(feature = "ctrlc_handler")]
pub async fn repl<R, H, Fut, E>(requester: R, handler: H)
where
H: Fn(UpdateWithCx<R, Message>) -> Fut + Send + Sync + 'static,
@ -31,8 +32,12 @@ where
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
let cloned_requester = requester.clone();
repl_with_listener(requester, handler, update_listeners::polling_default(cloned_requester))
.await;
repl_with_listener(
requester,
handler,
update_listeners::polling_default(cloned_requester).await,
)
.await;
}
/// Like [`repl`], but with a custom [`UpdateListener`].
@ -47,6 +52,7 @@ where
/// [`Dispatcher`]: crate::dispatching::Dispatcher
/// [`repl`]: crate::dispatching::repls::repl()
/// [`UpdateListener`]: crate::dispatching::update_listeners::UpdateListener
#[cfg(feature = "ctrlc_handler")]
pub async fn repl_with_listener<'a, R, H, Fut, E, L, ListenerE>(
requester: R,
handler: H,
@ -72,6 +78,7 @@ pub async fn repl_with_listener<'a, R, H, Fut, E, L, ListenerE>(
}
})
})
.setup_ctrlc_handler()
.dispatch_with_listener(
listener,
LoggingErrorHandler::with_custom_text("An error from the update listener"),

View file

@ -0,0 +1,76 @@
//! A stop token used to stop a listener.
use std::{future::Future, pin::Pin, task};
use futures::future::{pending, AbortHandle, Abortable, Pending};
/// A stop token allows you to stop a listener.
///
/// See also: [`UpdateListener::stop_token`].
///
/// [`UpdateListener::stop_token`]:
/// crate::dispatching::update_listeners::UpdateListener::stop_token
pub trait StopToken {
/// Stop the listener linked to this token.
fn stop(self);
}
/// A stop token which does nothing. May be used in prototyping or in cases
/// where you do not care about graceful shutdowning.
pub struct Noop;
impl StopToken for Noop {
fn stop(self) {}
}
/// A stop token which corresponds to [`AsyncStopFlag`].
#[derive(Clone)]
pub struct AsyncStopToken(AbortHandle);
/// A flag which corresponds to [`AsyncStopToken`].
///
/// To know if the stop token was used you can either repeatedly call
/// [`is_stopped`] or use this type as a `Future`.
///
/// [`is_stopped`]: AsyncStopFlag::is_stopped
#[pin_project::pin_project]
pub struct AsyncStopFlag(#[pin] Abortable<Pending<()>>);
impl AsyncStopToken {
/// Create a new token/flag pair.
pub fn new_pair() -> (Self, AsyncStopFlag) {
let (handle, reg) = AbortHandle::new_pair();
let token = Self(handle);
let flag = AsyncStopFlag(Abortable::new(pending(), reg));
(token, flag)
}
}
impl StopToken for AsyncStopToken {
fn stop(self) {
self.0.abort()
}
}
impl AsyncStopFlag {
/// Returns true if the stop token linked to `self` was used.
pub fn is_stopped(&self) -> bool {
self.0.is_aborted()
}
}
/// This future resolves when a stop token was used.
impl Future for AsyncStopFlag {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
self.project().0.poll(cx).map(|res| {
debug_assert!(
res.is_err(),
"Pending Future can't ever be resolved, so Abortable is only resolved when \
canceled"
);
})
}
}

View file

@ -96,107 +96,102 @@
//!
//! [`UpdateListener`]: UpdateListener
//! [`polling_default`]: polling_default
//! [`polling`]: polling
//! [`polling`]: polling()
//! [`Box::get_updates`]: crate::requests::Requester::get_updates
//! [getting updates]: https://core.telegram.org/bots/api#getting-updates
//! [long]: https://en.wikipedia.org/wiki/Push_technology#Long_polling
//! [short]: https://en.wikipedia.org/wiki/Polling_(computer_science)
//! [webhook]: https://en.wikipedia.org/wiki/Webhook
use futures::{stream, Stream, StreamExt};
use futures::Stream;
use std::{convert::TryInto, time::Duration};
use teloxide_core::{
requests::{HasPayload, Request, Requester},
types::{AllowedUpdate, SemiparsedVec, Update},
use std::time::Duration;
use crate::{
dispatching::stop_token::StopToken,
types::{AllowedUpdate, Update},
};
/// A generic update listener.
pub trait UpdateListener<E>: Stream<Item = Result<Update, E>> {
// TODO: add some methods here (.shutdown(), etc).
}
impl<S, E> UpdateListener<E> for S where S: Stream<Item = Result<Update, E>> {}
mod polling;
mod stateful_listener;
/// Returns a long polling update listener with `timeout` of 10 seconds.
pub use self::{
polling::{polling, polling_default},
stateful_listener::StatefulListener,
};
/// An update listener.
///
/// See also: [`polling`](polling).
pub fn polling_default<R>(requester: R) -> impl UpdateListener<R::Err>
where
R: Requester,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
polling(requester, Some(Duration::from_secs(10)), None, None)
/// Implementors of this trait allow getting updates from Telegram.
///
/// Currently Telegram has 2 ways of getting updates -- [polling] and
/// [webhooks]. Currently, only the former one is implemented (see [`polling()`]
/// and [`polling_default`])
///
/// Some functions of this trait are located in the supertrait
/// ([`AsUpdateStream`]), see also:
/// - [`AsUpdateStream::Stream`]
/// - [`AsUpdateStream::as_stream`]
///
/// [polling]: self#long-polling
/// [webhooks]: self#webhooks
pub trait UpdateListener<E>: for<'a> AsUpdateStream<'a, E> {
/// The type of token which allows to stop this listener.
type StopToken: StopToken;
/// Returns a token which stops this listener.
///
/// The [`stop`] function of the token is not guaranteed to have an
/// immediate effect. That is, some listeners can return updates even
/// after [`stop`] is called (e.g.: because of buffering).
///
/// [`stop`]: StopToken::stop
///
/// Implementors of this function are encouraged to stop listening for
/// updates as soon as possible and return `None` from the update stream as
/// soon as all cached updates are returned.
#[must_use = "This function doesn't stop listening, to stop listening you need to call stop on \
the returned token"]
fn stop_token(&mut self) -> Self::StopToken;
/// Hint which updates should the listener listen for.
///
/// For example [`polling()`] should send the hint as
/// [`GetUpdates::allowed_updates`]
///
/// Note however that this is a _hint_ and as such, it can be ignored. The
/// listener is not guaranteed to only return updates which types are listed
/// in the hint.
///
/// [`GetUpdates::allowed_updates`]:
/// crate::payloads::GetUpdates::allowed_updates
fn hint_allowed_updates(&mut self, hint: &mut dyn Iterator<Item = AllowedUpdate>) {
let _ = hint;
}
/// The timeout duration hint.
///
/// This hints how often dispatcher should check for a shutdown. E.g., for
/// [`polling()`] this returns the [`timeout`].
///
/// [`timeout`]: crate::payloads::GetUpdates::timeout
///
/// If you are implementing this trait and not sure what to return from this
/// function, just leave it with the default implementation.
fn timeout_hint(&self) -> Option<Duration> {
None
}
}
/// Returns a long/short polling update listener with some additional options.
/// [`UpdateListener`]'s supertrait/extension.
///
/// - `bot`: Using this bot, the returned update listener will receive updates.
/// - `timeout`: A timeout for polling.
/// - `limit`: Limits the number of updates to be retrieved at once. Values
/// between 1—100 are accepted.
/// - `allowed_updates`: A list the types of updates you want to receive.
/// See [`GetUpdates`] for defaults.
///
/// See also: [`polling_default`](polling_default).
///
/// [`GetUpdates`]: crate::payloads::GetUpdates
pub fn polling<R>(
requester: R,
timeout: Option<Duration>,
limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>,
) -> impl UpdateListener<R::Err>
where
R: Requester,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
let timeout = timeout.map(|t| t.as_secs().try_into().expect("timeout is too big"));
/// This trait is a workaround to not require GAT.
pub trait AsUpdateStream<'a, E> {
/// The stream of updates from Telegram.
type Stream: Stream<Item = Result<Update, E>> + 'a;
stream::unfold(
(allowed_updates, requester, 0),
move |(mut allowed_updates, bot, mut offset)| async move {
let mut req = bot.get_updates_fault_tolerant();
let payload = &mut req.payload_mut().0;
payload.offset = Some(offset);
payload.timeout = timeout;
payload.limit = limit;
payload.allowed_updates = allowed_updates.take();
let updates = match req.send().await {
Err(err) => vec![Err(err)],
Ok(SemiparsedVec(updates)) => {
// Set offset to the last update's id + 1
if let Some(upd) = updates.last() {
let id: i32 = match upd {
Ok(ok) => ok.id,
Err((value, _)) => value["update_id"]
.as_i64()
.expect("The 'update_id' field must always exist in Update")
.try_into()
.expect("update_id must be i32"),
};
offset = id + 1;
}
for update in &updates {
if let Err((value, e)) = update {
log::error!(
"Cannot parse an update.\nError: {:?}\nValue: {}\n\
This is a bug in teloxide-core, please open an issue here: \
https://github.com/teloxide/teloxide-core/issues.",
e,
value
);
}
}
updates.into_iter().filter_map(Result::ok).map(Ok).collect::<Vec<_>>()
}
};
Some((stream::iter(updates), (allowed_updates, bot, offset)))
},
)
.flatten()
/// Creates the update [`Stream`].
///
/// [`Stream`]: AsUpdateStream::Stream
fn as_stream(&'a mut self) -> Self::Stream;
}

View file

@ -0,0 +1,179 @@
use std::{convert::TryInto, time::Duration};
use futures::{
future::{ready, Either},
stream::{self, Stream, StreamExt},
};
use crate::{
dispatching::{
stop_token::{AsyncStopFlag, AsyncStopToken},
update_listeners::{stateful_listener::StatefulListener, UpdateListener},
},
payloads::GetUpdates,
requests::{HasPayload, Request, Requester},
types::{AllowedUpdate, SemiparsedVec, Update},
};
/// Returns a long polling update listener with `timeout` of 10 seconds.
///
/// See also: [`polling`](polling).
///
/// ## Notes
///
/// This function will automatically delete a webhook if it was set up.
pub async fn polling_default<R>(requester: R) -> impl UpdateListener<R::Err>
where
R: Requester + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
delete_webhook_if_setup(&requester).await;
polling(requester, Some(Duration::from_secs(10)), None, None)
}
/// Returns a long/short polling update listener with some additional options.
///
/// - `bot`: Using this bot, the returned update listener will receive updates.
/// - `timeout`: A timeout for polling.
/// - `limit`: Limits the number of updates to be retrieved at once. Values
/// between 1—100 are accepted.
/// - `allowed_updates`: A list the types of updates you want to receive.
/// See [`GetUpdates`] for defaults.
///
/// See also: [`polling_default`](polling_default).
///
/// [`GetUpdates`]: crate::payloads::GetUpdates
pub fn polling<R>(
requester: R,
timeout: Option<Duration>,
limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>,
) -> impl UpdateListener<R::Err>
where
R: Requester + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
{
struct State<B: Requester> {
bot: B,
timeout: Option<u32>,
limit: Option<u8>,
allowed_updates: Option<Vec<AllowedUpdate>>,
offset: i32,
flag: AsyncStopFlag,
token: AsyncStopToken,
}
fn stream<B>(st: &mut State<B>) -> impl Stream<Item = Result<Update, B::Err>> + '_
where
B: Requester,
{
stream::unfold(st, move |state| async move {
let State { timeout, limit, allowed_updates, bot, offset, flag, .. } = &mut *state;
if flag.is_stopped() {
let mut req = bot.get_updates_fault_tolerant();
req.payload_mut().0 = GetUpdates {
offset: Some(*offset),
timeout: Some(0),
limit: Some(1),
allowed_updates: allowed_updates.take(),
};
return match req.send().await {
Ok(_) => None,
Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)),
};
}
let mut req = bot.get_updates_fault_tolerant();
req.payload_mut().0 = GetUpdates {
offset: Some(*offset),
timeout: *timeout,
limit: *limit,
allowed_updates: allowed_updates.take(),
};
let updates = match req.send().await {
Err(err) => return Some((Either::Left(stream::once(ready(Err(err)))), state)),
Ok(SemiparsedVec(updates)) => {
// Set offset to the last update's id + 1
if let Some(upd) = updates.last() {
let id: i32 = match upd {
Ok(ok) => ok.id,
Err((value, _)) => value["update_id"]
.as_i64()
.expect("The 'update_id' field must always exist in Update")
.try_into()
.expect("update_id must be i32"),
};
*offset = id + 1;
}
for update in &updates {
if let Err((value, e)) = update {
log::error!(
"Cannot parse an update.\nError: {:?}\nValue: {}\n\
This is a bug in teloxide-core, please open an issue here: \
https://github.com/teloxide/teloxide-core/issues.",
e,
value
);
}
}
updates.into_iter().filter_map(Result::ok).map(Ok)
}
};
Some((Either::Right(stream::iter(updates)), state))
})
.flatten()
}
let (token, flag) = AsyncStopToken::new_pair();
let state = State {
bot: requester,
timeout: timeout.map(|t| t.as_secs().try_into().expect("timeout is too big")),
limit,
allowed_updates,
offset: 0,
flag,
token,
};
let stop_token = |st: &mut State<_>| st.token.clone();
let hint_allowed_updates =
Some(|state: &mut State<_>, allowed: &mut dyn Iterator<Item = AllowedUpdate>| {
// TODO: we should probably warn if there already were different allowed updates
// before
state.allowed_updates = Some(allowed.collect());
});
let timeout_hint = Some(move |_: &State<_>| timeout);
StatefulListener::new_with_hints(state, stream, stop_token, hint_allowed_updates, timeout_hint)
}
async fn delete_webhook_if_setup<R>(requester: &R)
where
R: Requester,
{
let webhook_info = match requester.get_webhook_info().send().await {
Ok(ok) => ok,
Err(e) => {
log::error!("Failed to get webhook info: {:?}", e);
return;
}
};
let is_webhook_setup = !webhook_info.url.is_empty();
if is_webhook_setup {
if let Err(e) = requester.delete_webhook().send().await {
log::error!("Failed to delete a webhook: {:?}", e);
}
}
}

View file

@ -0,0 +1,153 @@
use std::time::Duration;
use futures::Stream;
use crate::{
dispatching::{
stop_token::{self, StopToken},
update_listeners::{AsUpdateStream, UpdateListener},
},
types::{AllowedUpdate, Update},
};
/// A listener created from functions.
///
/// This type allows to turn a stream of updates (+ some additional functions)
/// into an [`UpdateListener`].
///
/// For an example of usage, see [`polling`].
///
/// [`polling`]: crate::dispatching::update_listeners::polling()
#[non_exhaustive]
pub struct StatefulListener<St, Assf, Sf, Hauf, Thf> {
/// The state of the listener.
pub state: St,
/// The function used as [`AsUpdateStream::as_stream`].
///
/// Must be of type `for<'a> &'a mut St -> impl Stream + 'a` and callable by
/// `&mut`.
pub stream: Assf,
/// The function used as [`UpdateListener::stop_token`].
///
/// Must be of type `for<'a> &'a mut St -> impl StopToken`.
pub stop_token: Sf,
/// The function used as [`UpdateListener::hint_allowed_updates`].
///
/// Must be of type `for<'a, 'b> &'a mut St, &'b mut dyn Iterator<Item =
/// AllowedUpdate> -> ()`.
pub hint_allowed_updates: Option<Hauf>,
/// The function used as [`UpdateListener::timeout_hint`].
///
/// Must be of type `for<'a> &'a St -> Option<Duration>` and callable by
/// `&`.
pub timeout_hint: Option<Thf>,
}
type Haufn<State> = for<'a, 'b> fn(&'a mut State, &'b mut dyn Iterator<Item = AllowedUpdate>);
type Thfn<State> = for<'a> fn(&'a State) -> Option<Duration>;
impl<St, Assf, Sf> StatefulListener<St, Assf, Sf, Haufn<St>, Thfn<St>> {
/// Creates a new stateful listener from its components.
pub fn new(state: St, stream: Assf, stop_token: Sf) -> Self {
Self::new_with_hints(state, stream, stop_token, None, None)
}
}
impl<St, Assf, Sf, Hauf, Thf> StatefulListener<St, Assf, Sf, Hauf, Thf> {
/// Creates a new stateful listener from its components.
pub fn new_with_hints(
state: St,
stream: Assf,
stop_token: Sf,
hint_allowed_updates: Option<Hauf>,
timeout_hint: Option<Thf>,
) -> Self {
Self { state, stream, stop_token, hint_allowed_updates, timeout_hint }
}
}
impl<S, E>
StatefulListener<
S,
for<'a> fn(&'a mut S) -> &'a mut S,
for<'a> fn(&'a mut S) -> stop_token::Noop,
Haufn<S>,
Thfn<S>,
>
where
S: Stream<Item = Result<Update, E>> + Unpin + 'static,
{
/// Creates a new update listener from a stream of updates which ignores
/// stop signals.
///
/// It won't be possible to ever stop this listener with a stop token.
pub fn from_stream_without_graceful_shutdown(stream: S) -> Self {
let this = Self::new_with_hints(
stream,
|s| s,
|_| stop_token::Noop,
None,
Some(|_| {
// FIXME: replace this by just Duration::MAX once 1.53 releases
// be released
const NANOS_PER_SEC: u32 = 1_000_000_000;
let dmax = Duration::new(u64::MAX, NANOS_PER_SEC - 1);
Some(dmax)
}),
);
assert_update_listener(this)
}
}
impl<'a, St, Assf, Sf, Hauf, Thf, Strm, E> AsUpdateStream<'a, E>
for StatefulListener<St, Assf, Hauf, Sf, Thf>
where
(St, Strm): 'a,
Assf: FnMut(&'a mut St) -> Strm,
Strm: Stream<Item = Result<Update, E>>,
{
type Stream = Strm;
fn as_stream(&'a mut self) -> Self::Stream {
(self.stream)(&mut self.state)
}
}
impl<St, Assf, Sf, Hauf, Stt, Thf, E> UpdateListener<E>
for StatefulListener<St, Assf, Sf, Hauf, Thf>
where
Self: for<'a> AsUpdateStream<'a, E>,
Sf: FnMut(&mut St) -> Stt,
Stt: StopToken,
Hauf: FnMut(&mut St, &mut dyn Iterator<Item = AllowedUpdate>),
Thf: Fn(&St) -> Option<Duration>,
{
type StopToken = Stt;
fn stop_token(&mut self) -> Stt {
(self.stop_token)(&mut self.state)
}
fn hint_allowed_updates(&mut self, hint: &mut dyn Iterator<Item = AllowedUpdate>) {
if let Some(f) = &mut self.hint_allowed_updates {
f(&mut self.state, hint);
}
}
fn timeout_hint(&self) -> Option<Duration> {
self.timeout_hint.as_ref().and_then(|f| f(&self.state))
}
}
fn assert_update_listener<L, E>(l: L) -> L
where
L: UpdateListener<E>,
{
l
}

View file

@ -1,6 +1,11 @@
use crate::dispatching::dialogue::GetChatId;
use teloxide_core::{
payloads::SendMessageSetters,
payloads::{
SendAnimationSetters, SendAudioSetters, SendContactSetters, SendDocumentSetters,
SendLocationSetters, SendMediaGroupSetters, SendMessageSetters, SendPhotoSetters,
SendStickerSetters, SendVenueSetters, SendVideoNoteSetters, SendVideoSetters,
SendVoiceSetters,
},
requests::{Request, Requester},
types::{ChatId, InputFile, InputMedia, Message},
};
@ -64,6 +69,87 @@ where
self.requester.send_message(self.chat_id(), text).reply_to_message_id(self.update.id)
}
pub fn reply_audio(&self, audio: InputFile) -> R::SendAudio {
self.requester.send_audio(self.update.chat.id, audio).reply_to_message_id(self.update.id)
}
pub fn reply_animation(&self, animation: InputFile) -> R::SendAnimation {
self.requester
.send_animation(self.update.chat.id, animation)
.reply_to_message_id(self.update.id)
}
pub fn reply_document(&self, document: InputFile) -> R::SendDocument {
self.requester
.send_document(self.update.chat.id, document)
.reply_to_message_id(self.update.id)
}
pub fn reply_photo(&self, photo: InputFile) -> R::SendPhoto {
self.requester.send_photo(self.update.chat.id, photo).reply_to_message_id(self.update.id)
}
pub fn reply_video(&self, video: InputFile) -> R::SendVideo {
self.requester.send_video(self.update.chat.id, video).reply_to_message_id(self.update.id)
}
pub fn reply_voice(&self, voice: InputFile) -> R::SendVoice {
self.requester.send_voice(self.update.chat.id, voice).reply_to_message_id(self.update.id)
}
pub fn reply_media_group<T>(&self, media_group: T) -> R::SendMediaGroup
where
T: IntoIterator<Item = InputMedia>,
{
self.requester
.send_media_group(self.update.chat.id, media_group)
.reply_to_message_id(self.update.id)
}
pub fn reply_location(&self, latitude: f64, longitude: f64) -> R::SendLocation {
self.requester
.send_location(self.update.chat.id, latitude, longitude)
.reply_to_message_id(self.update.id)
}
pub fn reply_venue<T, U>(
&self,
latitude: f64,
longitude: f64,
title: T,
address: U,
) -> R::SendVenue
where
T: Into<String>,
U: Into<String>,
{
self.requester
.send_venue(self.update.chat.id, latitude, longitude, title, address)
.reply_to_message_id(self.update.id)
}
pub fn reply_video_note(&self, video_note: InputFile) -> R::SendVideoNote {
self.requester
.send_video_note(self.update.chat.id, video_note)
.reply_to_message_id(self.update.id)
}
pub fn reply_contact<T, U>(&self, phone_number: T, first_name: U) -> R::SendContact
where
T: Into<String>,
U: Into<String>,
{
self.requester
.send_contact(self.update.chat.id, phone_number, first_name)
.reply_to_message_id(self.update.id)
}
pub fn reply_sticker(&self, sticker: InputFile) -> R::SendSticker {
self.requester
.send_sticker(self.update.chat.id, sticker)
.reply_to_message_id(self.update.id)
}
pub fn answer_photo(&self, photo: InputFile) -> R::SendPhoto {
self.requester.send_photo(self.update.chat.id, photo)
}

27
src/features.txt Normal file
View file

@ -0,0 +1,27 @@
## Cargo features
| Feature | Description |
|----------|----------|
| `redis-storage` | Enables the [Redis] storage support for dialogues.|
| `sqlite-storage` | Enables the [Sqlite] storage support for dialogues. |
| `cbor-serializer` | Enables the [CBOR] serializer for dialogues. |
| `bincode-serializer` | Enables the [Bincode] serializer for dialogues. |
| `macros` | Re-exports macros from [`teloxide-macros`]. |
| `native-tls` | Enables the [`native-tls`] TLS implementation (enabled by default). |
| `rustls` | Enables the [`rustls`] TLS implementation. |
| `ctrlc_handler` | Enables the [`Dispatcher::setup_ctrlc_handler`](dispatching::Dispatcher::setup_ctrlc_handler) function. |
| `auto-send` | Enables the `AutoSend` bot adaptor. |
| `cache-me` | Enables the `CacheMe` bot adaptor. |
| `frunk` | Enables [`teloxide::utils::UpState`]. |
| `full` | Enables all the features except `nightly`. |
| `nightly` | Enables nightly-only features (see the [teloxide-core features]). |
[Redis]: https://redis.io/
[Sqlite]: https://www.sqlite.org/
[CBOR]: https://en.wikipedia.org/wiki/CBOR
[Bincode]: https://github.com/servo/bincode
[`teloxide-macros`]: https://github.com/teloxide/teloxide-macros
[`native-tls`]: https://docs.rs/native-tls
[`rustls`]: https://docs.rs/rustls
[`teloxide::utils::UpState`]: utils::UpState
[teloxide-core features]: https://docs.rs/teloxide-core/latest/teloxide_core/#cargo-features

View file

@ -33,6 +33,12 @@
//! [`async`/`.await`]: https://rust-lang.github.io/async-book/01_getting_started/01_chapter.html
//! [Rust]: https://www.rust-lang.org/
// This hack is used to cancel formatting for a Markdown table. See [1], [2], and [3].
//
// [1]: https://github.com/rust-lang/rustfmt/issues/4210
// [2]: https://github.com/rust-lang/rustfmt/issues/4787
// [3]: https://github.com/rust-lang/rust/issues/82768#issuecomment-803935643
#![cfg_attr(feature = "nightly", cfg_attr(feature = "nightly", doc = include_str!("features.txt")))]
// https://github.com/teloxide/teloxide/raw/master/logo.svg doesn't work in html_logo_url, I don't know why.
#![doc(
html_logo_url = "https://github.com/teloxide/teloxide/raw/master/ICON.png",
@ -40,14 +46,19 @@
)]
#![allow(clippy::match_bool)]
#![forbid(unsafe_code)]
#![cfg_attr(all(feature = "nightly", doctest), feature(external_doc))]
// we pass "--cfg docsrs" when building docs to add `This is supported on feature="..." only.`
// We pass "--cfg docsrs" when building docs to add `This is supported on
// feature="..." only.`
//
// "--cfg dep_docsrs" is used for the same reason, but for `teloxide-core`.
//
// To properly build docs of this crate run
// ```console
// $ RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --open --all-features
// $ RUSTFLAGS="--cfg dep_docsrs" RUSTDOCFLAGS="--cfg docsrs -Znormalize-docs" cargo +nightly doc --open --all-features
// ```
#![cfg_attr(all(docsrs, feature = "nightly"), feature(doc_cfg))]
#![allow(clippy::redundant_pattern_matching)]
// https://github.com/rust-lang/rust-clippy/issues/7422
#![allow(clippy::nonstandard_macro_braces)]
pub use dispatching::repls::{
commands_repl, commands_repl_with_listener, dialogues_repl, dialogues_repl_with_listener, repl,
@ -73,7 +84,7 @@ pub use teloxide_macros as macros;
pub use teloxide_macros::teloxide;
#[cfg(all(feature = "nightly", doctest))]
#[doc(include = "../README.md")]
#[cfg_attr(feature = "nightly", cfg_attr(feature = "nightly", doc = include_str!("../README.md")))]
enum ReadmeDocTests {}
use teloxide_core::requests::ResponseResult;

View file

@ -1,7 +1,7 @@
/// Enables logging through [pretty-env-logger].
///
/// A logger will **only** print errors from teloxide and **all** logs from
/// your program.
/// A logger will **only** print errors, warnings, and general information from
/// teloxide and **all** logs from your program.
///
/// # Example
/// ```no_compile
@ -23,8 +23,8 @@ macro_rules! enable_logging {
/// Enables logging through [pretty-env-logger] with a custom filter for your
/// program.
///
/// A logger will **only** print errors from teloxide and restrict logs from
/// your program by the specified filter.
/// A logger will **only** print errors, warnings, and general information from
/// teloxide and restrict logs from your program by the specified filter.
///
/// # Example
/// Allow printing all logs from your program up to [`LevelFilter::Debug`] (i.e.
@ -46,7 +46,7 @@ macro_rules! enable_logging_with_filter {
pretty_env_logger::formatted_builder()
.write_style(pretty_env_logger::env_logger::WriteStyle::Auto)
.filter(Some(&env!("CARGO_PKG_NAME").replace("-", "_")), $filter)
.filter(Some("teloxide"), log::LevelFilter::Error)
.filter(Some("teloxide"), log::LevelFilter::Info)
.init();
};
}

View file

@ -76,7 +76,7 @@ pub use teloxide_macros::BotCommand;
/// # }
/// ```
///
/// ## Enum attributes
/// # Enum attributes
/// 1. `#[command(rename = "rule")]`
/// Rename all commands by `rule`. Allowed rules are `lowercase`. If you will
/// not use this attribute, commands will be parsed by their original names.
@ -93,7 +93,7 @@ pub use teloxide_macros::BotCommand;
/// after the first space into the first argument, which must implement
/// [`FromStr`].
///
/// ### Example
/// ## Example
/// ```
/// # #[cfg(feature = "macros")] {
/// use teloxide::utils::command::BotCommand;
@ -113,7 +113,7 @@ pub use teloxide_macros::BotCommand;
/// space character) and parses each part into the corresponding arguments,
/// which must implement [`FromStr`].
///
/// ### Example
/// ## Example
/// ```
/// # #[cfg(feature = "macros")] {
/// use teloxide::utils::command::BotCommand;
@ -133,7 +133,7 @@ pub use teloxide_macros::BotCommand;
/// Specify separator used by the `split` parser. It will be ignored when
/// accompanied by another type of parsers.
///
/// ### Example
/// ## Example
/// ```
/// # #[cfg(feature = "macros")] {
/// use teloxide::utils::command::BotCommand;
@ -149,20 +149,24 @@ pub use teloxide_macros::BotCommand;
/// # }
/// ```
///
/// ## Variant attributes
/// # Variant attributes
/// All variant attributes override the corresponding `enum` attributes.
///
/// 1. `#[command(rename = "rule")]`
/// Rename one command by a rule. Allowed rules are `lowercase`, `%some_name%`,
/// where `%some_name%` is any string, a new name.
///
/// 2. `#[command(parse_with = "parser")]`
/// 2. `#[command(description = "description")]`
/// Give your command a description. Write `"off"` for `"description"` to hide a
/// command.
///
/// 3. `#[command(parse_with = "parser")]`
/// One more option is available for variants.
/// - `custom_parser` - your own parser of the signature `fn(String) ->
/// Result<Tuple, ParseError>`, where `Tuple` corresponds to the variant's
/// arguments.
///
/// ### Example
/// ## Example
/// ```
/// # #[cfg(feature = "macros")] {
/// use teloxide::utils::command::{BotCommand, ParseError};
@ -191,11 +195,11 @@ pub use teloxide_macros::BotCommand;
/// # }
/// ```
///
/// 3. `#[command(prefix = "prefix")]`
/// 4. `#[command(description = "description")]`
/// 4. `#[command(prefix = "prefix")]`
/// 5. `#[command(separator = "sep")]`
///
/// Analogous to the descriptions above.
/// These attributes just override the corresponding `enum` attributes for a
/// specific variant.
///
/// [`FromStr`]: https://doc.rust-lang.org/std/str/trait.FromStr.html
/// [`BotCommand`]: crate::utils::command::BotCommand

View file

@ -44,7 +44,7 @@ pub fn link(url: &str, text: &str) -> String {
}
/// Builds an inline user mention link with an anchor.
pub fn user_mention(user_id: i32, text: &str) -> String {
pub fn user_mention(user_id: i64, text: &str) -> String {
link(format!("tg://user?id={}", user_id).as_str(), text)
}

View file

@ -59,7 +59,7 @@ pub fn link(url: &str, text: &str) -> String {
}
/// Builds an inline user mention link with an anchor.
pub fn user_mention(user_id: i32, text: &str) -> String {
pub fn user_mention(user_id: i64, text: &str) -> String {
link(format!("tg://user?id={}", user_id).as_str(), text)
}

View file

@ -1,3 +1,6 @@
// https://github.com/rust-lang/rust-clippy/issues/7422
#![allow(clippy::nonstandard_macro_braces)]
#[cfg(feature = "macros")]
use teloxide::utils::command::{BotCommand, ParseError};

View file

@ -1,9 +1,8 @@
use std::{
fmt::{Debug, Display},
future::Future,
sync::Arc,
};
use teloxide::dispatching::dialogue::{RedisStorage, Serializer, Storage};
use teloxide::dispatching::dialogue::{RedisStorage, RedisStorageError, Serializer, Storage};
#[tokio::test]
async fn test_redis_json() {
@ -40,32 +39,41 @@ async fn test_redis_cbor() {
type Dialogue = String;
macro_rules! test_dialogues {
($storage:expr, $_0:expr, $_1:expr, $_2:expr) => {
assert_eq!(Arc::clone(&$storage).get_dialogue(1).await.unwrap(), $_0);
assert_eq!(Arc::clone(&$storage).get_dialogue(11).await.unwrap(), $_1);
assert_eq!(Arc::clone(&$storage).get_dialogue(256).await.unwrap(), $_2);
};
}
async fn test_redis<S>(storage: Arc<RedisStorage<S>>)
where
S: Send + Sync + Serializer<Dialogue> + 'static,
<S as Serializer<Dialogue>>::Error: Debug + Display,
{
check_dialogue(None, Arc::clone(&storage).update_dialogue(1, "ABC".to_owned())).await;
check_dialogue(None, Arc::clone(&storage).update_dialogue(11, "DEF".to_owned())).await;
check_dialogue(None, Arc::clone(&storage).update_dialogue(256, "GHI".to_owned())).await;
test_dialogues!(storage, None, None, None);
// 1 - ABC, 11 - DEF, 256 - GHI
Arc::clone(&storage).update_dialogue(1, "ABC".to_owned()).await.unwrap();
Arc::clone(&storage).update_dialogue(11, "DEF".to_owned()).await.unwrap();
Arc::clone(&storage).update_dialogue(256, "GHI".to_owned()).await.unwrap();
check_dialogue("ABC", Arc::clone(&storage).update_dialogue(1, "JKL".to_owned())).await;
check_dialogue("GHI", Arc::clone(&storage).update_dialogue(256, "MNO".to_owned())).await;
test_dialogues!(
storage,
Some("ABC".to_owned()),
Some("DEF".to_owned()),
Some("GHI".to_owned())
);
// 1 - GKL, 11 - DEF, 256 - MNO
Arc::clone(&storage).remove_dialogue(1).await.unwrap();
Arc::clone(&storage).remove_dialogue(11).await.unwrap();
Arc::clone(&storage).remove_dialogue(256).await.unwrap();
check_dialogue("JKL", Arc::clone(&storage).remove_dialogue(1)).await;
check_dialogue("DEF", Arc::clone(&storage).remove_dialogue(11)).await;
check_dialogue("MNO", Arc::clone(&storage).remove_dialogue(256)).await;
}
async fn check_dialogue<E>(
expected: impl Into<Option<&str>>,
actual: impl Future<Output = Result<Option<Dialogue>, E>>,
) where
E: Debug,
{
assert_eq!(expected.into().map(ToOwned::to_owned), actual.await.unwrap())
test_dialogues!(storage, None, None, None);
// Check that a try to remove a non-existing dialogue results in an error.
assert!(matches!(
Arc::clone(&storage).remove_dialogue(1).await.unwrap_err(),
RedisStorageError::DialogueNotFound
));
}

View file

@ -1,9 +1,8 @@
use std::{
fmt::{Debug, Display},
future::Future,
sync::Arc,
};
use teloxide::dispatching::dialogue::{Serializer, SqliteStorage, Storage};
use teloxide::dispatching::dialogue::{Serializer, SqliteStorage, SqliteStorageError, Storage};
#[tokio::test(flavor = "multi_thread")]
async fn test_sqlite_json() {
@ -36,32 +35,41 @@ async fn test_sqlite_cbor() {
type Dialogue = String;
macro_rules! test_dialogues {
($storage:expr, $_0:expr, $_1:expr, $_2:expr) => {
assert_eq!(Arc::clone(&$storage).get_dialogue(1).await.unwrap(), $_0);
assert_eq!(Arc::clone(&$storage).get_dialogue(11).await.unwrap(), $_1);
assert_eq!(Arc::clone(&$storage).get_dialogue(256).await.unwrap(), $_2);
};
}
async fn test_sqlite<S>(storage: Arc<SqliteStorage<S>>)
where
S: Send + Sync + Serializer<Dialogue> + 'static,
<S as Serializer<Dialogue>>::Error: Debug + Display,
{
check_dialogue(None, Arc::clone(&storage).update_dialogue(1, "ABC".to_owned())).await;
check_dialogue(None, Arc::clone(&storage).update_dialogue(11, "DEF".to_owned())).await;
check_dialogue(None, Arc::clone(&storage).update_dialogue(256, "GHI".to_owned())).await;
test_dialogues!(storage, None, None, None);
// 1 - ABC, 11 - DEF, 256 - GHI
Arc::clone(&storage).update_dialogue(1, "ABC".to_owned()).await.unwrap();
Arc::clone(&storage).update_dialogue(11, "DEF".to_owned()).await.unwrap();
Arc::clone(&storage).update_dialogue(256, "GHI".to_owned()).await.unwrap();
check_dialogue("ABC", Arc::clone(&storage).update_dialogue(1, "JKL".to_owned())).await;
check_dialogue("GHI", Arc::clone(&storage).update_dialogue(256, "MNO".to_owned())).await;
test_dialogues!(
storage,
Some("ABC".to_owned()),
Some("DEF".to_owned()),
Some("GHI".to_owned())
);
// 1 - GKL, 11 - DEF, 256 - MNO
Arc::clone(&storage).remove_dialogue(1).await.unwrap();
Arc::clone(&storage).remove_dialogue(11).await.unwrap();
Arc::clone(&storage).remove_dialogue(256).await.unwrap();
check_dialogue("JKL", Arc::clone(&storage).remove_dialogue(1)).await;
check_dialogue("DEF", Arc::clone(&storage).remove_dialogue(11)).await;
check_dialogue("MNO", Arc::clone(&storage).remove_dialogue(256)).await;
}
async fn check_dialogue<E>(
expected: impl Into<Option<&str>>,
actual: impl Future<Output = Result<Option<Dialogue>, E>>,
) where
E: Debug,
{
assert_eq!(expected.into().map(ToOwned::to_owned), actual.await.unwrap())
test_dialogues!(storage, None, None, None);
// Check that a try to remove a non-existing dialogue results in an error.
assert!(matches!(
Arc::clone(&storage).remove_dialogue(1).await.unwrap_err(),
SqliteStorageError::DialogueNotFound
));
}