mirror of
https://github.com/teloxide/teloxide.git
synced 2025-01-24 09:16:12 +01:00
commit
ae0454a716
4 changed files with 98 additions and 108 deletions
|
@ -186,81 +186,79 @@ impl<'a, HandlerE, Eh> FilterDispatcher<'a, HandlerE, Eh> {
|
|||
Eh: ErrorHandler<Either<UpdaterE, HandlerE>>,
|
||||
{
|
||||
updater
|
||||
.for_each_concurrent(None, |res| {
|
||||
async {
|
||||
let Update { kind, id } = match res {
|
||||
Ok(upd) => upd,
|
||||
Err(err) => {
|
||||
self.error_handler
|
||||
.handle_error(Either::Left(err))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
.for_each_concurrent(None, |res| async {
|
||||
let Update { kind, id } = match res {
|
||||
Ok(upd) => upd,
|
||||
Err(err) => {
|
||||
self.error_handler
|
||||
.handle_error(Either::Left(err))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
log::debug!(
|
||||
"Handled update#{id:?}: {kind:?}",
|
||||
id = id,
|
||||
kind = kind
|
||||
);
|
||||
log::debug!(
|
||||
"Handled update#{id:?}: {kind:?}",
|
||||
id = id,
|
||||
kind = kind
|
||||
);
|
||||
|
||||
match kind {
|
||||
UpdateKind::Message(mes) => {
|
||||
Self::handle(
|
||||
mes,
|
||||
&self.message_handlers,
|
||||
&self.error_handler,
|
||||
)
|
||||
.await
|
||||
}
|
||||
UpdateKind::EditedMessage(mes) => {
|
||||
Self::handle(
|
||||
mes,
|
||||
&self.edited_message_handlers,
|
||||
&self.error_handler,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
UpdateKind::ChannelPost(post) => {
|
||||
Self::handle(
|
||||
post,
|
||||
&self.channel_post_handlers,
|
||||
&self.error_handler,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
UpdateKind::EditedChannelPost(post) => {
|
||||
Self::handle(
|
||||
post,
|
||||
&self.edited_channel_post_handlers,
|
||||
&self.error_handler,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
UpdateKind::InlineQuery(query) => {
|
||||
Self::handle(
|
||||
query,
|
||||
&self.inline_query_handlers,
|
||||
&self.error_handler,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
UpdateKind::ChosenInlineResult(result) => {
|
||||
Self::handle(
|
||||
result,
|
||||
&self.chosen_inline_result_handlers,
|
||||
&self.error_handler,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
UpdateKind::CallbackQuery(callback) => {
|
||||
Self::handle(
|
||||
callback,
|
||||
&self.callback_query_handlers,
|
||||
&self.error_handler,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
match kind {
|
||||
UpdateKind::Message(mes) => {
|
||||
Self::handle(
|
||||
mes,
|
||||
&self.message_handlers,
|
||||
&self.error_handler,
|
||||
)
|
||||
.await
|
||||
}
|
||||
UpdateKind::EditedMessage(mes) => {
|
||||
Self::handle(
|
||||
mes,
|
||||
&self.edited_message_handlers,
|
||||
&self.error_handler,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
UpdateKind::ChannelPost(post) => {
|
||||
Self::handle(
|
||||
post,
|
||||
&self.channel_post_handlers,
|
||||
&self.error_handler,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
UpdateKind::EditedChannelPost(post) => {
|
||||
Self::handle(
|
||||
post,
|
||||
&self.edited_channel_post_handlers,
|
||||
&self.error_handler,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
UpdateKind::InlineQuery(query) => {
|
||||
Self::handle(
|
||||
query,
|
||||
&self.inline_query_handlers,
|
||||
&self.error_handler,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
UpdateKind::ChosenInlineResult(result) => {
|
||||
Self::handle(
|
||||
result,
|
||||
&self.chosen_inline_result_handlers,
|
||||
&self.error_handler,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
UpdateKind::CallbackQuery(callback) => {
|
||||
Self::handle(
|
||||
callback,
|
||||
&self.callback_query_handlers,
|
||||
&self.error_handler,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
})
|
||||
|
@ -310,17 +308,13 @@ mod tests {
|
|||
let counter2 = &AtomicI32::new(0);
|
||||
|
||||
let mut dp = FilterDispatcher::<Infallible, _>::new(|_| async {})
|
||||
.message_handler(true, |_mes: Message| {
|
||||
async move {
|
||||
counter.fetch_add(1, Ordering::SeqCst);
|
||||
Ok::<_, Infallible>(())
|
||||
}
|
||||
.message_handler(true, |_mes: Message| async move {
|
||||
counter.fetch_add(1, Ordering::SeqCst);
|
||||
Ok::<_, Infallible>(())
|
||||
})
|
||||
.message_handler(true, |_mes: Message| {
|
||||
async move {
|
||||
counter2.fetch_add(1, Ordering::SeqCst);
|
||||
Ok::<_, Infallible>(())
|
||||
}
|
||||
.message_handler(true, |_mes: Message| async move {
|
||||
counter2.fetch_add(1, Ordering::SeqCst);
|
||||
Ok::<_, Infallible>(())
|
||||
});
|
||||
|
||||
dp.dispatch(one_message_updater()).await;
|
||||
|
|
|
@ -146,25 +146,23 @@ pub fn polling(
|
|||
|
||||
stream::unfold(
|
||||
(allowed_updates, bot, 0),
|
||||
move |(mut allowed_updates, bot, mut offset)| {
|
||||
async move {
|
||||
let mut req = bot.get_updates().offset(offset);
|
||||
req.timeout = timeout;
|
||||
req.limit = limit;
|
||||
req.allowed_updates = allowed_updates.take();
|
||||
move |(mut allowed_updates, bot, mut offset)| async move {
|
||||
let mut req = bot.get_updates().offset(offset);
|
||||
req.timeout = timeout;
|
||||
req.limit = limit;
|
||||
req.allowed_updates = allowed_updates.take();
|
||||
|
||||
let updates = match req.send().await {
|
||||
Err(err) => vec![Err(err)],
|
||||
Ok(updates) => {
|
||||
if let Some(upd) = updates.last() {
|
||||
offset = upd.id + 1;
|
||||
}
|
||||
updates.into_iter().map(Ok).collect::<Vec<_>>()
|
||||
let updates = match req.send().await {
|
||||
Err(err) => vec![Err(err)],
|
||||
Ok(updates) => {
|
||||
if let Some(upd) = updates.last() {
|
||||
offset = upd.id + 1;
|
||||
}
|
||||
};
|
||||
updates.into_iter().map(Ok).collect::<Vec<_>>()
|
||||
}
|
||||
};
|
||||
|
||||
Some((stream::iter(updates), (allowed_updates, bot, offset)))
|
||||
}
|
||||
Some((stream::iter(updates), (allowed_updates, bot, offset)))
|
||||
},
|
||||
)
|
||||
.flatten()
|
||||
|
|
|
@ -39,13 +39,11 @@ pub async fn download_file_stream(
|
|||
.await?
|
||||
.error_for_status()?;
|
||||
|
||||
Ok(futures::stream::unfold(res, |mut res| {
|
||||
async {
|
||||
match res.chunk().await {
|
||||
Err(err) => Some((Err(err), res)),
|
||||
Ok(Some(c)) => Some((Ok(c), res)),
|
||||
Ok(None) => None,
|
||||
}
|
||||
Ok(futures::stream::unfold(res, |mut res| async {
|
||||
match res.chunk().await {
|
||||
Err(err) => Some((Err(err), res)),
|
||||
Ok(Some(c)) => Some((Ok(c), res)),
|
||||
Ok(None) => None,
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue