From 445c42b585e386d314c81047c1db8a49a72e54b1 Mon Sep 17 00:00:00 2001 From: Waffle Date: Sat, 12 Dec 2020 15:52:47 +0300 Subject: [PATCH] Refactor file downloading MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Make `net` module public - Move `Bot::download_file{,_stream}` methods to a new `Download` trait - Add `download_forward` macro to forward `Download` impls (priv) - Impl `Download` for all bot adaptors & the `Bot` itself - Change return type of `download_file_stream` — return `Stream>``, instead of `Future>>>`` - Add `api_url` param to standalone versions of `download_file{,_stream}` - Make `net::{TELEGRAM_API_URL, download_file{,_stream}}` pub - Small documentation changes --- src/adaptors/auto_send.rs | 7 ++ src/adaptors/cache_me.rs | 7 ++ src/adaptors/parse_mode.rs | 7 ++ src/adaptors/throttle.rs | 7 ++ src/bot/download.rs | 73 +++++------------ src/lib.rs | 2 +- src/local_macros.rs | 31 ++++++++ src/net/download.rs | 157 +++++++++++++++++++++++++++---------- src/net/mod.rs | 12 ++- src/requests/multipart.rs | 2 +- src/types/file.rs | 15 ++-- 11 files changed, 217 insertions(+), 103 deletions(-) diff --git a/src/adaptors/auto_send.rs b/src/adaptors/auto_send.rs index f5c71e61..54a85d87 100644 --- a/src/adaptors/auto_send.rs +++ b/src/adaptors/auto_send.rs @@ -96,6 +96,13 @@ impl Requester for AutoSend { } } +download_forward! { + 'w + B + AutoSend + { this => this.inner() } +} + #[pin_project::pin_project] pub struct AutoRequest(#[pin] Inner); diff --git a/src/adaptors/cache_me.rs b/src/adaptors/cache_me.rs index 3f8000fe..8f54a2ee 100644 --- a/src/adaptors/cache_me.rs +++ b/src/adaptors/cache_me.rs @@ -113,6 +113,13 @@ where } } +download_forward! { + 'w + B + CacheMe + { this => this.inner() } +} + pub struct CachedMeRequest>(Inner, GetMe); enum Inner> { diff --git a/src/adaptors/parse_mode.rs b/src/adaptors/parse_mode.rs index e191e73d..ce4814d9 100644 --- a/src/adaptors/parse_mode.rs +++ b/src/adaptors/parse_mode.rs @@ -106,3 +106,10 @@ impl Requester for DefaultParseMode { set_sticker_set_thumb, send_invoice, answer_shipping_query, answer_pre_checkout_query, set_passport_data_errors => fid, fty } } + +download_forward! { + 'w + B + DefaultParseMode + { this => this.inner() } +} diff --git a/src/adaptors/throttle.rs b/src/adaptors/throttle.rs index 2401e04c..e7e2250c 100644 --- a/src/adaptors/throttle.rs +++ b/src/adaptors/throttle.rs @@ -457,6 +457,13 @@ where } } +download_forward! { + 'w + B + Throttle + { this => this.inner() } +} + /// Id used in worker. /// /// It is used instead of `ChatId` to make copying cheap even in case of diff --git a/src/bot/download.rs b/src/bot/download.rs index bc859df7..dad76b31 100644 --- a/src/bot/download.rs +++ b/src/bot/download.rs @@ -1,66 +1,33 @@ use bytes::Bytes; -use tokio::{io::AsyncWrite, stream::Stream}; +use futures::{future::BoxFuture, stream::BoxStream, FutureExt, StreamExt}; +use tokio::io::AsyncWrite; use crate::{ bot::Bot, - net::{download_file, download_file_stream}, + net::{self, Download}, DownloadError, }; -impl Bot { - /// Download a file from Telegram into `destination`. - /// - /// `path` can be obtained from [`GetFile`]. - /// - /// To download as a stream of chunks, see [`Bot::download_file_stream`]. - /// - /// ## Examples - /// - /// ```no_run - /// use teloxide_core::{ - /// requests::{Request, Requester}, - /// types::File as TgFile, - /// Bot, - /// }; - /// use tokio::fs::File; - /// - /// # async fn run() -> Result<(), Box> { - /// let bot = Bot::new("TOKEN"); - /// - /// let TgFile { file_path, .. } = bot.get_file("*file_id*").send().await?; - /// let mut file = File::create("/home/waffle/Pictures/test.png").await?; - /// bot.download_file(&file_path, &mut file).await?; - /// # Ok(()) } - /// ``` - /// - /// [`GetFile`]: crate::payloads::GetFile - /// [`Bot::download_file_stream`]: crate::Bot::download_file_stream - pub async fn download_file( +impl<'w> Download<'w> for Bot { + type Err = DownloadError; + + // I would like to unbox this, but my coworkers will kill me if they'll see yet + // another hand written `Future`. (waffle) + type Fut = BoxFuture<'w, Result<(), Self::Err>>; + + fn download_file( &self, path: &str, - destination: &mut D, - ) -> Result<(), DownloadError> - where - D: AsyncWrite + Unpin, - { - download_file(&self.client, &self.token, path, destination).await + destination: &'w mut (dyn AsyncWrite + Unpin + Send), + ) -> Self::Fut { + net::download_file(&self.client, self.api_url.get(), &self.token, path, destination).boxed() } - /// Download a file from Telegram. - /// - /// `path` can be obtained from the [`GetFile`]. - /// - /// To download into [`AsyncWrite`] (e.g. [`tokio::fs::File`]), see - /// [`Bot::download_file`]. - /// - /// [`GetFile`]: crate::payloads::GetFile - /// [`AsyncWrite`]: tokio::io::AsyncWrite - /// [`tokio::fs::File`]: tokio::fs::File - /// [`Bot::download_file`]: crate::Bot::download_file - pub async fn download_file_stream( - &self, - path: &str, - ) -> Result>, reqwest::Error> { - download_file_stream(&self.client, &self.token, path).await + type StreamErr = reqwest::Error; + + type Stream = BoxStream<'static, Result>; + + fn download_file_stream(&self, path: &str) -> Self::Stream { + net::download_file_stream(&self.client, self.api_url.get(), &self.token, path).boxed() } } diff --git a/src/lib.rs b/src/lib.rs index 54d79c30..7b33b931 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,6 +24,7 @@ pub use self::{ }; pub mod adaptors; +pub mod net; pub mod payloads; pub mod prelude; pub mod requests; @@ -34,7 +35,6 @@ mod bot; mod errors; // implementation details -mod net; mod serde_multipart; /// Constructs a client from the `TELOXIDE_PROXY` environmental variable. diff --git a/src/local_macros.rs b/src/local_macros.rs index 83bf0f57..51be0d62 100644 --- a/src/local_macros.rs +++ b/src/local_macros.rs @@ -995,3 +995,34 @@ macro_rules! requester_forward { } }; } + +#[macro_use] +macro_rules! download_forward { + ($l:lifetime $T:ident $S:ty {$this:ident => $inner:expr}) => { + impl<$l, $T: crate::net::Download<$l>> crate::net::Download<$l> for $S { + type Err = <$T as crate::net::Download<$l>>::Err; + + type Fut = <$T as crate::net::Download<$l>>::Fut; + + fn download_file( + &self, + path: &str, + destination: &'w mut (dyn tokio::io::AsyncWrite + + core::marker::Unpin + + core::marker::Send), + ) -> Self::Fut { + let $this = self; + ($inner).download_file(path, destination) + } + + type StreamErr = <$T as crate::net::Download<$l>>::StreamErr; + + type Stream = <$T as crate::net::Download<$l>>::Stream; + + fn download_file_stream(&self, path: &str) -> Self::Stream { + let $this = self; + ($inner).download_file_stream(path) + } + } + }; +} diff --git a/src/net/download.rs b/src/net/download.rs index 3a507ad2..d19de588 100644 --- a/src/net/download.rs +++ b/src/net/download.rs @@ -1,54 +1,131 @@ -use reqwest::Client; +use std::future::Future; + +use bytes::Bytes; +use futures::{ + future::{ready, Either}, + stream::{once, unfold}, + FutureExt, Stream, StreamExt, +}; +use reqwest::{Client, Response, Url}; use tokio::io::{AsyncWrite, AsyncWriteExt}; -use crate::errors::DownloadError; +use crate::{errors::DownloadError, net::file_url}; -pub async fn download_file( - client: &Client, - token: &str, - path: &str, - destination: &mut D, -) -> Result<(), DownloadError> -where - D: AsyncWrite + Unpin, +/// Download client. +/// +/// This trait allows you to download files from telegram. +pub trait Download<'w> +/* FIXME(waffle): ideally, this lifetime ('w) shouldn't be here, but we can't help it without + * GATs */ { - let mut res = client - .get(crate::net::file_url( - reqwest::Url::parse(crate::net::TELEGRAM_API_URL).expect("failed to parse default url"), - token, - path, - )) - .send() - .await? - .error_for_status()?; + /// Error returned by [`download_file`](Self::download_file) + type Err; - while let Some(chunk) = res.chunk().await? { - destination.write_all(&chunk).await?; - } + /// Future returned from [`download_file`](Self::download_file) + type Fut: Future> + Send; - Ok(()) + /// Download a file from Telegram into `destination`. + /// + /// `path` can be obtained from [`GetFile`]. + /// + /// To download as a stream of chunks, see [`download_file_stream`]. + /// + /// ## Examples + /// + /// ```no_run + /// use teloxide_core::{ + /// requests::{Download, Request, Requester}, + /// types::File as TgFile, + /// Bot, + /// }; + /// use tokio::fs::File; + /// + /// # async fn run() -> Result<(), Box> { + /// let bot = Bot::new("TOKEN"); + /// + /// let TgFile { file_path, .. } = bot.get_file("*file_id*").send().await?; + /// let mut file = File::create("/tmp/test.png").await?; + /// bot.download_file(&file_path, &mut file).await?; + /// # Ok(()) } + /// ``` + /// + /// [`GetFile`]: crate::payloads::GetFile + /// [`download_file_stream`]: Self::download_file_stream + fn download_file( + &self, + path: &str, + destination: &'w mut (dyn AsyncWrite + Unpin + Send), + ) -> Self::Fut; + + /// Error returned by [`download_file_stream`](Self::download_file_stream) + type StreamErr; + + /// Stream returned from [`download_file_stream`] + /// + ///[`download_file_stream`]: (Self::download_file_stream) + type Stream: Stream> + Send; + + /// Download a file from Telegram as a [`Stream`]. + /// + /// `path` can be obtained from the [`GetFile`]. + /// + /// To download into an [`AsyncWrite`] (e.g. [`tokio::fs::File`]), see + /// [`download_file`]. + /// + /// [`GetFile`]: crate::payloads::GetFile + /// [`AsyncWrite`]: tokio::io::AsyncWrite + /// [`tokio::fs::File`]: tokio::fs::File + /// [`download_file`]: Self::download_file + fn download_file_stream(&self, path: &str) -> Self::Stream; } -pub async fn download_file_stream( +/// Download a file from Telegram into `dst`. +/// +/// Note: if you don't need to use a different (from you're bot) client and +/// don't need to get *all* performance (and you don't, c'mon it's very io-bound +/// job), then it's recommended to use [`Download::download_file`] +pub fn download_file<'o, D>( client: &Client, + api_url: Url, token: &str, path: &str, -) -> Result>, reqwest::Error> { - let res = client - .get(crate::net::file_url( - reqwest::Url::parse(crate::net::TELEGRAM_API_URL).expect("failed to parse default url"), - token, - path, - )) - .send() - .await? - .error_for_status()?; + dst: &'o mut D, +) -> impl Future> + 'o +where + D: ?Sized + AsyncWrite + Unpin, +{ + client.get(file_url(api_url, token, path)).send().then(move |r| async move { + let mut res = r?.error_for_status()?; - Ok(futures::stream::unfold(res, |mut res| async { - match res.chunk().await { - Err(err) => Some((Err(err), res)), - Ok(Some(c)) => Some((Ok(c), res)), - Ok(None) => None, + while let Some(chunk) = res.chunk().await? { + dst.write_all(&chunk).await?; } - })) + + Ok(()) + }) +} + +/// Download a file from Telegram as a [`Stream`]. +/// +/// Note: if you don't need to use a different (from you're bot) client and +/// don't need to get *all* performance (and you don't, c'mon it's very io-bound +/// job), then it's recommended to use [`Download::download_file_stream`] +pub fn download_file_stream( + client: &Client, + api_url: Url, + token: &str, + path: &str, +) -> impl Stream> + 'static { + client.get(file_url(api_url, token, path)).send().into_stream().flat_map(|res| { + match res.and_then(Response::error_for_status) { + Ok(res) => Either::Left(unfold(res, |mut res| async { + match res.chunk().await { + Err(err) => Some((Err(err), res)), + Ok(Some(c)) => Some((Ok(c), res)), + Ok(None) => None, + } + })), + Err(err) => Either::Right(once(ready(Err(err)))), + } + }) } diff --git a/src/net/mod.rs b/src/net/mod.rs index a6699ca5..cac0f366 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -1,5 +1,12 @@ +//! Network specific items. +//! +//! Currently this module contains only the file download stuff and the default +//! api url. +// ... and some internal stuff :P + +pub use self::download::{download_file, download_file_stream, Download}; + pub(crate) use self::{ - download::{download_file, download_file_stream}, request::{request_json, request_multipart}, telegram_response::TelegramResponse, }; @@ -8,7 +15,8 @@ mod download; mod request; mod telegram_response; -pub(crate) const TELEGRAM_API_URL: &str = "https://api.telegram.org"; +/// Default telegram api url +pub const TELEGRAM_API_URL: &str = "https://api.telegram.org"; /// Creates URL for making HTTPS requests. See the [Telegram documentation]. /// diff --git a/src/requests/multipart.rs b/src/requests/multipart.rs index 009271bd..41339055 100644 --- a/src/requests/multipart.rs +++ b/src/requests/multipart.rs @@ -18,7 +18,7 @@ pub struct MultipartRequest

{ } impl

MultipartRequest

{ - pub fn new(bot: Bot, payload: P) -> Self { + pub const fn new(bot: Bot, payload: P) -> Self { Self { bot, payload } } } diff --git a/src/types/file.rs b/src/types/file.rs index 86908109..a1eeeb75 100644 --- a/src/types/file.rs +++ b/src/types/file.rs @@ -2,13 +2,15 @@ use serde::{Deserialize, Serialize}; /// This object represents a file ready to be downloaded. /// -/// The file can be downloaded via the link `https://api.telegram.org/file/bot/`. -/// It is guaranteed that the link will be valid for at least 1 hour. When the -/// link expires, a new one can be requested by calling [`GetFile`]. +/// The file can be downloaded via the [`Bot::download_file(file_path, dst)`] +/// method. It is guaranteed that the path from [`GetFile`] will be valid for at +/// least 1 hour. When the path expires, a new one can be requested by calling +/// [`GetFile`]. /// /// [The official docs](https://core.telegram.org/bots/api#file). /// /// [`GetFile`]: crate::payloads::GetFile +/// [`Bot::download_file(file_path, dst)`]: crate::net::Download::download_file #[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)] pub struct File { /// Identifier for this file. @@ -22,9 +24,10 @@ pub struct File { /// File size, if known. pub file_size: u32, - // TODO: chacge "Use ..." to use bot.download... - /// File path. Use `https://api.telegram.org/file/bot/` - /// to get the file. + /// File path. Use [`Bot::download_file(file_path, dst)`] to get the file. + /// + /// [`Bot::download_file(file_path, dst)`]: + /// crate::net::Download::download_file pub file_path: String, }