mirror of
https://github.com/teloxide/teloxide.git
synced 2025-03-13 19:27:52 +01:00
Box::pin -> .boxed()
This commit is contained in:
parent
b097e1d8f1
commit
9cb44bef0d
7 changed files with 49 additions and 43 deletions
|
@ -6,7 +6,7 @@ use crate::dispatching::{
|
|||
};
|
||||
use std::{convert::Infallible, marker::PhantomData};
|
||||
|
||||
use futures::{future::BoxFuture, StreamExt};
|
||||
use futures::{future::BoxFuture, FutureExt, StreamExt};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use lockfree::map::Map;
|
||||
|
@ -137,28 +137,30 @@ where
|
|||
{
|
||||
let this = Arc::new(self);
|
||||
|
||||
Box::pin(updates.for_each(move |cx| {
|
||||
let this = Arc::clone(&this);
|
||||
let chat_id = cx.update.chat_id();
|
||||
updates
|
||||
.for_each(move |cx| {
|
||||
let this = Arc::clone(&this);
|
||||
let chat_id = cx.update.chat_id();
|
||||
|
||||
match this.senders.get(&chat_id) {
|
||||
// An old dialogue
|
||||
Some(tx) => {
|
||||
if tx.1.send(cx).is_err() {
|
||||
panic!("We are not dropping a receiver or call .close() on it",);
|
||||
match this.senders.get(&chat_id) {
|
||||
// An old dialogue
|
||||
Some(tx) => {
|
||||
if tx.1.send(cx).is_err() {
|
||||
panic!("We are not dropping a receiver or call .close() on it",);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let tx = this.new_tx();
|
||||
if tx.send(cx).is_err() {
|
||||
panic!("We are not dropping a receiver or call .close() on it",);
|
||||
}
|
||||
this.senders.insert(chat_id, tx);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let tx = this.new_tx();
|
||||
if tx.send(cx).is_err() {
|
||||
panic!("We are not dropping a receiver or call .close() on it",);
|
||||
}
|
||||
this.senders.insert(chat_id, tx);
|
||||
}
|
||||
}
|
||||
|
||||
async {}
|
||||
}))
|
||||
async {}
|
||||
})
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use crate::prelude::{DialogueStage, DialogueWithCx};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{future::BoxFuture, FutureExt};
|
||||
use std::{future::Future, sync::Arc};
|
||||
|
||||
/// An asynchronous handler of an update used in [`DialogueDispatcher`].
|
||||
|
@ -27,6 +27,6 @@ where
|
|||
where
|
||||
DialogueWithCx<Upd, D, E>: Send + 'static,
|
||||
{
|
||||
Box::pin(async move { self(cx).await })
|
||||
async move { self(cx).await }.boxed()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use super::Storage;
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{future::BoxFuture, FutureExt};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
|
@ -32,7 +32,7 @@ impl<D> Storage<D> for InMemStorage<D> {
|
|||
where
|
||||
D: Send + 'static,
|
||||
{
|
||||
Box::pin(async move { Ok(self.map.lock().await.remove(&chat_id)) })
|
||||
async move { Ok(self.map.lock().await.remove(&chat_id)) }.boxed()
|
||||
}
|
||||
|
||||
fn update_dialogue(
|
||||
|
@ -43,6 +43,6 @@ impl<D> Storage<D> for InMemStorage<D> {
|
|||
where
|
||||
D: Send + 'static,
|
||||
{
|
||||
Box::pin(async move { Ok(self.map.lock().await.insert(chat_id, dialogue)) })
|
||||
async move { Ok(self.map.lock().await.insert(chat_id, dialogue)) }.boxed()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use super::{serializer::Serializer, Storage};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{future::BoxFuture, FutureExt};
|
||||
use redis::{AsyncCommands, FromRedisValue, IntoConnectionInfo};
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use std::{
|
||||
|
@ -57,7 +57,7 @@ where
|
|||
self: Arc<Self>,
|
||||
chat_id: i64,
|
||||
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
|
||||
Box::pin(async move {
|
||||
async move {
|
||||
let res = redis::pipe()
|
||||
.atomic()
|
||||
.get(chat_id)
|
||||
|
@ -80,7 +80,7 @@ where
|
|||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
})
|
||||
}.boxed()
|
||||
}
|
||||
|
||||
fn update_dialogue(
|
||||
|
@ -88,7 +88,7 @@ where
|
|||
chat_id: i64,
|
||||
dialogue: D,
|
||||
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
|
||||
Box::pin(async move {
|
||||
async move {
|
||||
let dialogue =
|
||||
self.serializer.serialize(&dialogue).map_err(RedisStorageError::SerdeError)?;
|
||||
Ok(self
|
||||
|
@ -99,6 +99,7 @@ where
|
|||
.await?
|
||||
.map(|d| self.serializer.deserialize(&d).map_err(RedisStorageError::SerdeError))
|
||||
.transpose()?)
|
||||
})
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::future::Future;
|
||||
|
||||
use crate::dispatching::{DispatcherHandlerRx, UpdateWithCx};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{future::BoxFuture, FutureExt};
|
||||
|
||||
/// An asynchronous handler of a stream of updates used in [`Dispatcher`].
|
||||
///
|
||||
|
@ -25,6 +25,6 @@ where
|
|||
where
|
||||
UpdateWithCx<Upd>: Send + 'static,
|
||||
{
|
||||
Box::pin(async move { self(updates).await })
|
||||
async move { self(updates).await }.boxed()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ where
|
|||
where
|
||||
Self: Stream<Item = UpdateWithCx<Message>>,
|
||||
{
|
||||
Box::pin(self.filter_map(|cx| async move { cx.update.text_owned().map(|text| (cx, text)) }))
|
||||
self.filter_map(|cx| async move { cx.update.text_owned().map(|text| (cx, text)) }).boxed()
|
||||
}
|
||||
|
||||
fn commands<C, N>(self, bot_name: N) -> BoxStream<'static, (UpdateWithCx<Message>, C)>
|
||||
|
@ -41,10 +41,12 @@ where
|
|||
{
|
||||
let bot_name = bot_name.into();
|
||||
|
||||
Box::pin(self.text_messages().filter_map(move |(cx, text)| {
|
||||
let bot_name = bot_name.clone();
|
||||
self.text_messages()
|
||||
.filter_map(move |(cx, text)| {
|
||||
let bot_name = bot_name.clone();
|
||||
|
||||
async move { C::parse(&text, &bot_name).map(|command| (cx, command)).ok() }
|
||||
}))
|
||||
async move { C::parse(&text, &bot_name).map(|command| (cx, command)).ok() }
|
||||
})
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! Convenient error handling.
|
||||
|
||||
use futures::future::BoxFuture;
|
||||
use futures::{future::BoxFuture, FutureExt};
|
||||
use std::{convert::Infallible, fmt::Debug, future::Future, sync::Arc};
|
||||
|
||||
/// An asynchronous handler of an error.
|
||||
|
@ -19,7 +19,7 @@ where
|
|||
Fut: Future<Output = ()> + Send,
|
||||
{
|
||||
fn handle_error(self: Arc<Self>, error: E) -> BoxFuture<'static, ()> {
|
||||
Box::pin(async move { self(error).await })
|
||||
async move { self(error).await }.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -81,11 +81,12 @@ where
|
|||
Eh: ErrorHandler<E> + Send + Sync,
|
||||
Arc<Eh>: 'a,
|
||||
{
|
||||
Box::pin(async move {
|
||||
async move {
|
||||
if let Err(error) = self {
|
||||
eh.handle_error(error).await;
|
||||
}
|
||||
})
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -114,7 +115,7 @@ impl IgnoringErrorHandler {
|
|||
|
||||
impl<E> ErrorHandler<E> for IgnoringErrorHandler {
|
||||
fn handle_error(self: Arc<Self>, _: E) -> BoxFuture<'static, ()> {
|
||||
Box::pin(async {})
|
||||
async {}.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -160,7 +161,7 @@ impl IgnoringErrorHandlerSafe {
|
|||
#[allow(unreachable_code)]
|
||||
impl ErrorHandler<Infallible> for IgnoringErrorHandlerSafe {
|
||||
fn handle_error(self: Arc<Self>, _: Infallible) -> BoxFuture<'static, ()> {
|
||||
Box::pin(async {})
|
||||
async {}.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -207,6 +208,6 @@ where
|
|||
{
|
||||
fn handle_error(self: Arc<Self>, error: E) -> BoxFuture<'static, ()> {
|
||||
log::error!("{text}: {:?}", error, text = self.text);
|
||||
Box::pin(async {})
|
||||
async {}.boxed()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue