mirror of
https://github.com/teloxide/teloxide.git
synced 2024-10-24 01:47:08 +02:00
commit
0c6a81e1b6
3 changed files with 29 additions and 24 deletions
|
@ -123,3 +123,4 @@ C: Into<String>, { ... }
|
||||||
## Misc
|
## Misc
|
||||||
1. Use `Into<...>` only where there exists at least one conversion **and** it will be logically to use.
|
1. Use `Into<...>` only where there exists at least one conversion **and** it will be logically to use.
|
||||||
2. Always mark a function as `#[must_use]` if its return value **must** be used.
|
2. Always mark a function as `#[must_use]` if its return value **must** be used.
|
||||||
|
3. `Box::pin(async [move] { ... })` instead of `async [move] { ... }.boxed()`.
|
||||||
|
|
|
@ -6,7 +6,7 @@ use crate::dispatching::{
|
||||||
};
|
};
|
||||||
use std::{convert::Infallible, marker::PhantomData};
|
use std::{convert::Infallible, marker::PhantomData};
|
||||||
|
|
||||||
use futures::{future::BoxFuture, StreamExt};
|
use futures::{future::BoxFuture, FutureExt, StreamExt};
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use lockfree::map::Map;
|
use lockfree::map::Map;
|
||||||
|
@ -137,28 +137,30 @@ where
|
||||||
{
|
{
|
||||||
let this = Arc::new(self);
|
let this = Arc::new(self);
|
||||||
|
|
||||||
Box::pin(updates.for_each(move |cx| {
|
updates
|
||||||
let this = Arc::clone(&this);
|
.for_each(move |cx| {
|
||||||
let chat_id = cx.update.chat_id();
|
let this = Arc::clone(&this);
|
||||||
|
let chat_id = cx.update.chat_id();
|
||||||
|
|
||||||
match this.senders.get(&chat_id) {
|
match this.senders.get(&chat_id) {
|
||||||
// An old dialogue
|
// An old dialogue
|
||||||
Some(tx) => {
|
Some(tx) => {
|
||||||
if tx.1.send(cx).is_err() {
|
if tx.1.send(cx).is_err() {
|
||||||
panic!("We are not dropping a receiver or call .close() on it",);
|
panic!("We are not dropping a receiver or call .close() on it",);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
let tx = this.new_tx();
|
||||||
|
if tx.send(cx).is_err() {
|
||||||
|
panic!("We are not dropping a receiver or call .close() on it",);
|
||||||
|
}
|
||||||
|
this.senders.insert(chat_id, tx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
None => {
|
|
||||||
let tx = this.new_tx();
|
|
||||||
if tx.send(cx).is_err() {
|
|
||||||
panic!("We are not dropping a receiver or call .close() on it",);
|
|
||||||
}
|
|
||||||
this.senders.insert(chat_id, tx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async {}
|
async {}
|
||||||
}))
|
})
|
||||||
|
.boxed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,7 @@ where
|
||||||
where
|
where
|
||||||
Self: Stream<Item = UpdateWithCx<Message>>,
|
Self: Stream<Item = UpdateWithCx<Message>>,
|
||||||
{
|
{
|
||||||
Box::pin(self.filter_map(|cx| async move { cx.update.text_owned().map(|text| (cx, text)) }))
|
self.filter_map(|cx| async move { cx.update.text_owned().map(|text| (cx, text)) }).boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn commands<C, N>(self, bot_name: N) -> BoxStream<'static, (UpdateWithCx<Message>, C)>
|
fn commands<C, N>(self, bot_name: N) -> BoxStream<'static, (UpdateWithCx<Message>, C)>
|
||||||
|
@ -41,10 +41,12 @@ where
|
||||||
{
|
{
|
||||||
let bot_name = bot_name.into();
|
let bot_name = bot_name.into();
|
||||||
|
|
||||||
Box::pin(self.text_messages().filter_map(move |(cx, text)| {
|
self.text_messages()
|
||||||
let bot_name = bot_name.clone();
|
.filter_map(move |(cx, text)| {
|
||||||
|
let bot_name = bot_name.clone();
|
||||||
|
|
||||||
async move { C::parse(&text, &bot_name).map(|command| (cx, command)).ok() }
|
async move { C::parse(&text, &bot_name).map(|command| (cx, command)).ok() }
|
||||||
}))
|
})
|
||||||
|
.boxed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue