Merge pull request #30 from teloxide/requester_download

Refactor file downloading
This commit is contained in:
Waffle Lapkin 2020-12-12 19:23:01 +03:00 committed by GitHub
commit bb9b2fb8b5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 222 additions and 106 deletions

View file

@ -96,6 +96,13 @@ impl<B: Requester> Requester for AutoSend<B> {
}
}
download_forward! {
'w
B
AutoSend<B>
{ this => this.inner() }
}
#[pin_project::pin_project]
pub struct AutoRequest<R: Request>(#[pin] Inner<R>);

View file

@ -113,6 +113,13 @@ where
}
}
download_forward! {
'w
B
CacheMe<B>
{ this => this.inner() }
}
pub struct CachedMeRequest<R: Request<Payload = GetMe>>(Inner<R>, GetMe);
enum Inner<R: Request<Payload = GetMe>> {

View file

@ -106,3 +106,10 @@ impl<B: Requester> Requester for DefaultParseMode<B> {
set_sticker_set_thumb, send_invoice, answer_shipping_query, answer_pre_checkout_query, set_passport_data_errors => fid, fty
}
}
download_forward! {
'w
B
DefaultParseMode<B>
{ this => this.inner() }
}

View file

@ -457,6 +457,13 @@ where
}
}
download_forward! {
'w
B
Throttle<B>
{ this => this.inner() }
}
/// Id used in worker.
///
/// It is used instead of `ChatId` to make copying cheap even in case of

View file

@ -1,66 +1,33 @@
use bytes::Bytes;
use tokio::{io::AsyncWrite, stream::Stream};
use futures::{future::BoxFuture, stream::BoxStream, FutureExt, StreamExt};
use tokio::io::AsyncWrite;
use crate::{
bot::Bot,
net::{download_file, download_file_stream},
net::{self, Download},
DownloadError,
};
impl Bot {
/// Download a file from Telegram into `destination`.
///
/// `path` can be obtained from [`GetFile`].
///
/// To download as a stream of chunks, see [`Bot::download_file_stream`].
///
/// ## Examples
///
/// ```no_run
/// use teloxide_core::{
/// requests::{Request, Requester},
/// types::File as TgFile,
/// Bot,
/// };
/// use tokio::fs::File;
///
/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
/// let bot = Bot::new("TOKEN");
///
/// let TgFile { file_path, .. } = bot.get_file("*file_id*").send().await?;
/// let mut file = File::create("/home/waffle/Pictures/test.png").await?;
/// bot.download_file(&file_path, &mut file).await?;
/// # Ok(()) }
/// ```
///
/// [`GetFile`]: crate::payloads::GetFile
/// [`Bot::download_file_stream`]: crate::Bot::download_file_stream
pub async fn download_file<D>(
impl<'w> Download<'w> for Bot {
type Err = DownloadError;
// I would like to unbox this, but my coworkers will kill me if they'll see yet
// another hand written `Future`. (waffle)
type Fut = BoxFuture<'w, Result<(), Self::Err>>;
fn download_file(
&self,
path: &str,
destination: &mut D,
) -> Result<(), DownloadError>
where
D: AsyncWrite + Unpin,
{
download_file(&self.client, &self.token, path, destination).await
destination: &'w mut (dyn AsyncWrite + Unpin + Send),
) -> Self::Fut {
net::download_file(&self.client, self.api_url.get(), &self.token, path, destination).boxed()
}
/// Download a file from Telegram.
///
/// `path` can be obtained from the [`GetFile`].
///
/// To download into [`AsyncWrite`] (e.g. [`tokio::fs::File`]), see
/// [`Bot::download_file`].
///
/// [`GetFile`]: crate::payloads::GetFile
/// [`AsyncWrite`]: tokio::io::AsyncWrite
/// [`tokio::fs::File`]: tokio::fs::File
/// [`Bot::download_file`]: crate::Bot::download_file
pub async fn download_file_stream(
&self,
path: &str,
) -> Result<impl Stream<Item = Result<Bytes, reqwest::Error>>, reqwest::Error> {
download_file_stream(&self.client, &self.token, path).await
type StreamErr = reqwest::Error;
type Stream = BoxStream<'static, Result<Bytes, Self::StreamErr>>;
fn download_file_stream(&self, path: &str) -> Self::Stream {
net::download_file_stream(&self.client, self.api_url.get(), &self.token, path).boxed()
}
}

View file

@ -24,6 +24,7 @@ pub use self::{
};
pub mod adaptors;
pub mod net;
pub mod payloads;
pub mod prelude;
pub mod requests;
@ -34,7 +35,6 @@ mod bot;
mod errors;
// implementation details
mod net;
mod serde_multipart;
/// Constructs a client from the `TELOXIDE_PROXY` environmental variable.

View file

@ -995,3 +995,34 @@ macro_rules! requester_forward {
}
};
}
#[macro_use]
macro_rules! download_forward {
($l:lifetime $T:ident $S:ty {$this:ident => $inner:expr}) => {
impl<$l, $T: crate::net::Download<$l>> crate::net::Download<$l> for $S {
type Err = <$T as crate::net::Download<$l>>::Err;
type Fut = <$T as crate::net::Download<$l>>::Fut;
fn download_file(
&self,
path: &str,
destination: &'w mut (dyn tokio::io::AsyncWrite
+ core::marker::Unpin
+ core::marker::Send),
) -> Self::Fut {
let $this = self;
($inner).download_file(path, destination)
}
type StreamErr = <$T as crate::net::Download<$l>>::StreamErr;
type Stream = <$T as crate::net::Download<$l>>::Stream;
fn download_file_stream(&self, path: &str) -> Self::Stream {
let $this = self;
($inner).download_file_stream(path)
}
}
};
}

View file

@ -1,54 +1,132 @@
use reqwest::Client;
use std::future::Future;
use bytes::Bytes;
use futures::{
future::{ready, Either},
stream::{once, unfold},
FutureExt, Stream, StreamExt,
};
use reqwest::{Client, Response, Url};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use crate::errors::DownloadError;
use crate::{errors::DownloadError, net::file_url};
pub async fn download_file<D>(
client: &Client,
token: &str,
path: &str,
destination: &mut D,
) -> Result<(), DownloadError>
where
D: AsyncWrite + Unpin,
/// Download client.
///
/// This trait allows you to download files from telegram.
pub trait Download<'w>
/* FIXME(waffle): ideally, this lifetime ('w) shouldn't be here, but we can't help it without
* GATs */
{
let mut res = client
.get(crate::net::file_url(
reqwest::Url::parse(crate::net::TELEGRAM_API_URL).expect("failed to parse default url"),
token,
path,
))
.send()
.await?
.error_for_status()?;
/// Error returned by [`download_file`](Self::download_file)
type Err;
while let Some(chunk) = res.chunk().await? {
destination.write_all(&chunk).await?;
}
/// Future returned from [`download_file`](Self::download_file)
type Fut: Future<Output = Result<(), Self::Err>> + Send;
Ok(())
/// Download a file from Telegram into `destination`.
///
/// `path` can be obtained from [`GetFile`].
///
/// To download as a stream of chunks, see [`download_file_stream`].
///
/// ## Examples
///
/// ```no_run
/// use teloxide_core::{
/// net::Download,
/// requests::{Request, Requester},
/// types::File as TgFile,
/// Bot,
/// };
/// use tokio::fs::File;
///
/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
/// let bot = Bot::new("TOKEN");
///
/// let TgFile { file_path, .. } = bot.get_file("*file_id*").send().await?;
/// let mut file = File::create("/tmp/test.png").await?;
/// bot.download_file(&file_path, &mut file).await?;
/// # Ok(()) }
/// ```
///
/// [`GetFile`]: crate::payloads::GetFile
/// [`download_file_stream`]: Self::download_file_stream
fn download_file(
&self,
path: &str,
destination: &'w mut (dyn AsyncWrite + Unpin + Send),
) -> Self::Fut;
/// Error returned by [`download_file_stream`](Self::download_file_stream)
type StreamErr;
/// Stream returned from [`download_file_stream`]
///
///[`download_file_stream`]: (Self::download_file_stream)
type Stream: Stream<Item = Result<Bytes, Self::StreamErr>> + Send;
/// Download a file from Telegram as a [`Stream`].
///
/// `path` can be obtained from the [`GetFile`].
///
/// To download into an [`AsyncWrite`] (e.g. [`tokio::fs::File`]), see
/// [`download_file`].
///
/// [`GetFile`]: crate::payloads::GetFile
/// [`AsyncWrite`]: tokio::io::AsyncWrite
/// [`tokio::fs::File`]: tokio::fs::File
/// [`download_file`]: Self::download_file
fn download_file_stream(&self, path: &str) -> Self::Stream;
}
pub async fn download_file_stream(
/// Download a file from Telegram into `dst`.
///
/// Note: if you don't need to use a different (from you're bot) client and
/// don't need to get *all* performance (and you don't, c'mon it's very io-bound
/// job), then it's recommended to use [`Download::download_file`]
pub fn download_file<'o, D>(
client: &Client,
api_url: Url,
token: &str,
path: &str,
) -> Result<impl futures::Stream<Item = reqwest::Result<bytes::Bytes>>, reqwest::Error> {
let res = client
.get(crate::net::file_url(
reqwest::Url::parse(crate::net::TELEGRAM_API_URL).expect("failed to parse default url"),
token,
path,
))
.send()
.await?
.error_for_status()?;
dst: &'o mut D,
) -> impl Future<Output = Result<(), DownloadError>> + 'o
where
D: ?Sized + AsyncWrite + Unpin,
{
client.get(file_url(api_url, token, path)).send().then(move |r| async move {
let mut res = r?.error_for_status()?;
Ok(futures::stream::unfold(res, |mut res| async {
match res.chunk().await {
Err(err) => Some((Err(err), res)),
Ok(Some(c)) => Some((Ok(c), res)),
Ok(None) => None,
while let Some(chunk) = res.chunk().await? {
dst.write_all(&chunk).await?;
}
}))
Ok(())
})
}
/// Download a file from Telegram as a [`Stream`].
///
/// Note: if you don't need to use a different (from you're bot) client and
/// don't need to get *all* performance (and you don't, c'mon it's very io-bound
/// job), then it's recommended to use [`Download::download_file_stream`]
pub fn download_file_stream(
client: &Client,
api_url: Url,
token: &str,
path: &str,
) -> impl Stream<Item = reqwest::Result<Bytes>> + 'static {
client.get(file_url(api_url, token, path)).send().into_stream().flat_map(|res| {
match res.and_then(Response::error_for_status) {
Ok(res) => Either::Left(unfold(res, |mut res| async {
match res.chunk().await {
Err(err) => Some((Err(err), res)),
Ok(Some(c)) => Some((Ok(c), res)),
Ok(None) => None,
}
})),
Err(err) => Either::Right(once(ready(Err(err)))),
}
})
}

View file

@ -1,5 +1,12 @@
//! Network specific items.
//!
//! Currently this module contains only the file download stuff and the default
//! api url.
// ... and some internal stuff :P
pub use self::download::{download_file, download_file_stream, Download};
pub(crate) use self::{
download::{download_file, download_file_stream},
request::{request_json, request_multipart},
telegram_response::TelegramResponse,
};
@ -8,7 +15,8 @@ mod download;
mod request;
mod telegram_response;
pub(crate) const TELEGRAM_API_URL: &str = "https://api.telegram.org";
/// Default telegram api url
pub const TELEGRAM_API_URL: &str = "https://api.telegram.org";
/// Creates URL for making HTTPS requests. See the [Telegram documentation].
///

View file

@ -62,9 +62,10 @@ mod tests {
let s = r#"{"ok":false,"error_code":409,"description":"Conflict: terminated by other getUpdates request; make sure that only one bot instance is running"}"#;
let val = serde_json::from_str::<TelegramResponse<Update>>(s).unwrap();
assert!(
matches!(val, TelegramResponse::Err { error: ApiError::TerminatedByOtherGetUpdates, .. })
);
assert!(matches!(
val,
TelegramResponse::Err { error: ApiError::TerminatedByOtherGetUpdates, .. }
));
}
#[test]

View file

@ -18,7 +18,7 @@ pub struct MultipartRequest<P> {
}
impl<P> MultipartRequest<P> {
pub fn new(bot: Bot, payload: P) -> Self {
pub const fn new(bot: Bot, payload: P) -> Self {
Self { bot, payload }
}
}

View file

@ -2,13 +2,15 @@ use serde::{Deserialize, Serialize};
/// This object represents a file ready to be downloaded.
///
/// The file can be downloaded via the link `https://api.telegram.org/file/bot<token>/<file_path>`.
/// It is guaranteed that the link will be valid for at least 1 hour. When the
/// link expires, a new one can be requested by calling [`GetFile`].
/// The file can be downloaded via the [`Bot::download_file(file_path, dst)`]
/// method. It is guaranteed that the path from [`GetFile`] will be valid for at
/// least 1 hour. When the path expires, a new one can be requested by calling
/// [`GetFile`].
///
/// [The official docs](https://core.telegram.org/bots/api#file).
///
/// [`GetFile`]: crate::payloads::GetFile
/// [`Bot::download_file(file_path, dst)`]: crate::net::Download::download_file
#[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct File {
/// Identifier for this file.
@ -22,9 +24,10 @@ pub struct File {
/// File size, if known.
pub file_size: u32,
// TODO: chacge "Use ..." to use bot.download...
/// File path. Use `https://api.telegram.org/file/bot<token>/<file_path>`
/// to get the file.
/// File path. Use [`Bot::download_file(file_path, dst)`] to get the file.
///
/// [`Bot::download_file(file_path, dst)`]:
/// crate::net::Download::download_file
pub file_path: String,
}