mirror of
https://github.com/teloxide/teloxide.git
synced 2025-01-09 03:43:22 +01:00
Refactor file downloading
- Make `net` module public - Move `Bot::download_file{,_stream}` methods to a new `Download` trait - Add `download_forward` macro to forward `Download` impls (priv) - Impl `Download` for all bot adaptors & the `Bot` itself - Change return type of `download_file_stream` — return `Stream<Result<Bytes>>``, instead of `Future<Result<Stream<Result<Bytes>>>>`` - Add `api_url` param to standalone versions of `download_file{,_stream}` - Make `net::{TELEGRAM_API_URL, download_file{,_stream}}` pub - Small documentation changes
This commit is contained in:
parent
91015b7b3f
commit
445c42b585
11 changed files with 217 additions and 103 deletions
|
@ -96,6 +96,13 @@ impl<B: Requester> Requester for AutoSend<B> {
|
|||
}
|
||||
}
|
||||
|
||||
download_forward! {
|
||||
'w
|
||||
B
|
||||
AutoSend<B>
|
||||
{ this => this.inner() }
|
||||
}
|
||||
|
||||
#[pin_project::pin_project]
|
||||
pub struct AutoRequest<R: Request>(#[pin] Inner<R>);
|
||||
|
||||
|
|
|
@ -113,6 +113,13 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
download_forward! {
|
||||
'w
|
||||
B
|
||||
CacheMe<B>
|
||||
{ this => this.inner() }
|
||||
}
|
||||
|
||||
pub struct CachedMeRequest<R: Request<Payload = GetMe>>(Inner<R>, GetMe);
|
||||
|
||||
enum Inner<R: Request<Payload = GetMe>> {
|
||||
|
|
|
@ -106,3 +106,10 @@ impl<B: Requester> Requester for DefaultParseMode<B> {
|
|||
set_sticker_set_thumb, send_invoice, answer_shipping_query, answer_pre_checkout_query, set_passport_data_errors => fid, fty
|
||||
}
|
||||
}
|
||||
|
||||
download_forward! {
|
||||
'w
|
||||
B
|
||||
DefaultParseMode<B>
|
||||
{ this => this.inner() }
|
||||
}
|
||||
|
|
|
@ -457,6 +457,13 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
download_forward! {
|
||||
'w
|
||||
B
|
||||
Throttle<B>
|
||||
{ this => this.inner() }
|
||||
}
|
||||
|
||||
/// Id used in worker.
|
||||
///
|
||||
/// It is used instead of `ChatId` to make copying cheap even in case of
|
||||
|
|
|
@ -1,66 +1,33 @@
|
|||
use bytes::Bytes;
|
||||
use tokio::{io::AsyncWrite, stream::Stream};
|
||||
use futures::{future::BoxFuture, stream::BoxStream, FutureExt, StreamExt};
|
||||
use tokio::io::AsyncWrite;
|
||||
|
||||
use crate::{
|
||||
bot::Bot,
|
||||
net::{download_file, download_file_stream},
|
||||
net::{self, Download},
|
||||
DownloadError,
|
||||
};
|
||||
|
||||
impl Bot {
|
||||
/// Download a file from Telegram into `destination`.
|
||||
///
|
||||
/// `path` can be obtained from [`GetFile`].
|
||||
///
|
||||
/// To download as a stream of chunks, see [`Bot::download_file_stream`].
|
||||
///
|
||||
/// ## Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use teloxide_core::{
|
||||
/// requests::{Request, Requester},
|
||||
/// types::File as TgFile,
|
||||
/// Bot,
|
||||
/// };
|
||||
/// use tokio::fs::File;
|
||||
///
|
||||
/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
|
||||
/// let bot = Bot::new("TOKEN");
|
||||
///
|
||||
/// let TgFile { file_path, .. } = bot.get_file("*file_id*").send().await?;
|
||||
/// let mut file = File::create("/home/waffle/Pictures/test.png").await?;
|
||||
/// bot.download_file(&file_path, &mut file).await?;
|
||||
/// # Ok(()) }
|
||||
/// ```
|
||||
///
|
||||
/// [`GetFile`]: crate::payloads::GetFile
|
||||
/// [`Bot::download_file_stream`]: crate::Bot::download_file_stream
|
||||
pub async fn download_file<D>(
|
||||
impl<'w> Download<'w> for Bot {
|
||||
type Err = DownloadError;
|
||||
|
||||
// I would like to unbox this, but my coworkers will kill me if they'll see yet
|
||||
// another hand written `Future`. (waffle)
|
||||
type Fut = BoxFuture<'w, Result<(), Self::Err>>;
|
||||
|
||||
fn download_file(
|
||||
&self,
|
||||
path: &str,
|
||||
destination: &mut D,
|
||||
) -> Result<(), DownloadError>
|
||||
where
|
||||
D: AsyncWrite + Unpin,
|
||||
{
|
||||
download_file(&self.client, &self.token, path, destination).await
|
||||
destination: &'w mut (dyn AsyncWrite + Unpin + Send),
|
||||
) -> Self::Fut {
|
||||
net::download_file(&self.client, self.api_url.get(), &self.token, path, destination).boxed()
|
||||
}
|
||||
|
||||
/// Download a file from Telegram.
|
||||
///
|
||||
/// `path` can be obtained from the [`GetFile`].
|
||||
///
|
||||
/// To download into [`AsyncWrite`] (e.g. [`tokio::fs::File`]), see
|
||||
/// [`Bot::download_file`].
|
||||
///
|
||||
/// [`GetFile`]: crate::payloads::GetFile
|
||||
/// [`AsyncWrite`]: tokio::io::AsyncWrite
|
||||
/// [`tokio::fs::File`]: tokio::fs::File
|
||||
/// [`Bot::download_file`]: crate::Bot::download_file
|
||||
pub async fn download_file_stream(
|
||||
&self,
|
||||
path: &str,
|
||||
) -> Result<impl Stream<Item = Result<Bytes, reqwest::Error>>, reqwest::Error> {
|
||||
download_file_stream(&self.client, &self.token, path).await
|
||||
type StreamErr = reqwest::Error;
|
||||
|
||||
type Stream = BoxStream<'static, Result<Bytes, Self::StreamErr>>;
|
||||
|
||||
fn download_file_stream(&self, path: &str) -> Self::Stream {
|
||||
net::download_file_stream(&self.client, self.api_url.get(), &self.token, path).boxed()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ pub use self::{
|
|||
};
|
||||
|
||||
pub mod adaptors;
|
||||
pub mod net;
|
||||
pub mod payloads;
|
||||
pub mod prelude;
|
||||
pub mod requests;
|
||||
|
@ -34,7 +35,6 @@ mod bot;
|
|||
mod errors;
|
||||
|
||||
// implementation details
|
||||
mod net;
|
||||
mod serde_multipart;
|
||||
|
||||
/// Constructs a client from the `TELOXIDE_PROXY` environmental variable.
|
||||
|
|
|
@ -995,3 +995,34 @@ macro_rules! requester_forward {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_use]
|
||||
macro_rules! download_forward {
|
||||
($l:lifetime $T:ident $S:ty {$this:ident => $inner:expr}) => {
|
||||
impl<$l, $T: crate::net::Download<$l>> crate::net::Download<$l> for $S {
|
||||
type Err = <$T as crate::net::Download<$l>>::Err;
|
||||
|
||||
type Fut = <$T as crate::net::Download<$l>>::Fut;
|
||||
|
||||
fn download_file(
|
||||
&self,
|
||||
path: &str,
|
||||
destination: &'w mut (dyn tokio::io::AsyncWrite
|
||||
+ core::marker::Unpin
|
||||
+ core::marker::Send),
|
||||
) -> Self::Fut {
|
||||
let $this = self;
|
||||
($inner).download_file(path, destination)
|
||||
}
|
||||
|
||||
type StreamErr = <$T as crate::net::Download<$l>>::StreamErr;
|
||||
|
||||
type Stream = <$T as crate::net::Download<$l>>::Stream;
|
||||
|
||||
fn download_file_stream(&self, path: &str) -> Self::Stream {
|
||||
let $this = self;
|
||||
($inner).download_file_stream(path)
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -1,54 +1,131 @@
|
|||
use reqwest::Client;
|
||||
use std::future::Future;
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::{
|
||||
future::{ready, Either},
|
||||
stream::{once, unfold},
|
||||
FutureExt, Stream, StreamExt,
|
||||
};
|
||||
use reqwest::{Client, Response, Url};
|
||||
use tokio::io::{AsyncWrite, AsyncWriteExt};
|
||||
|
||||
use crate::errors::DownloadError;
|
||||
use crate::{errors::DownloadError, net::file_url};
|
||||
|
||||
pub async fn download_file<D>(
|
||||
client: &Client,
|
||||
token: &str,
|
||||
path: &str,
|
||||
destination: &mut D,
|
||||
) -> Result<(), DownloadError>
|
||||
where
|
||||
D: AsyncWrite + Unpin,
|
||||
/// Download client.
|
||||
///
|
||||
/// This trait allows you to download files from telegram.
|
||||
pub trait Download<'w>
|
||||
/* FIXME(waffle): ideally, this lifetime ('w) shouldn't be here, but we can't help it without
|
||||
* GATs */
|
||||
{
|
||||
let mut res = client
|
||||
.get(crate::net::file_url(
|
||||
reqwest::Url::parse(crate::net::TELEGRAM_API_URL).expect("failed to parse default url"),
|
||||
token,
|
||||
path,
|
||||
))
|
||||
.send()
|
||||
.await?
|
||||
.error_for_status()?;
|
||||
/// Error returned by [`download_file`](Self::download_file)
|
||||
type Err;
|
||||
|
||||
while let Some(chunk) = res.chunk().await? {
|
||||
destination.write_all(&chunk).await?;
|
||||
}
|
||||
/// Future returned from [`download_file`](Self::download_file)
|
||||
type Fut: Future<Output = Result<(), Self::Err>> + Send;
|
||||
|
||||
Ok(())
|
||||
/// Download a file from Telegram into `destination`.
|
||||
///
|
||||
/// `path` can be obtained from [`GetFile`].
|
||||
///
|
||||
/// To download as a stream of chunks, see [`download_file_stream`].
|
||||
///
|
||||
/// ## Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use teloxide_core::{
|
||||
/// requests::{Download, Request, Requester},
|
||||
/// types::File as TgFile,
|
||||
/// Bot,
|
||||
/// };
|
||||
/// use tokio::fs::File;
|
||||
///
|
||||
/// # async fn run() -> Result<(), Box<dyn std::error::Error>> {
|
||||
/// let bot = Bot::new("TOKEN");
|
||||
///
|
||||
/// let TgFile { file_path, .. } = bot.get_file("*file_id*").send().await?;
|
||||
/// let mut file = File::create("/tmp/test.png").await?;
|
||||
/// bot.download_file(&file_path, &mut file).await?;
|
||||
/// # Ok(()) }
|
||||
/// ```
|
||||
///
|
||||
/// [`GetFile`]: crate::payloads::GetFile
|
||||
/// [`download_file_stream`]: Self::download_file_stream
|
||||
fn download_file(
|
||||
&self,
|
||||
path: &str,
|
||||
destination: &'w mut (dyn AsyncWrite + Unpin + Send),
|
||||
) -> Self::Fut;
|
||||
|
||||
/// Error returned by [`download_file_stream`](Self::download_file_stream)
|
||||
type StreamErr;
|
||||
|
||||
/// Stream returned from [`download_file_stream`]
|
||||
///
|
||||
///[`download_file_stream`]: (Self::download_file_stream)
|
||||
type Stream: Stream<Item = Result<Bytes, Self::StreamErr>> + Send;
|
||||
|
||||
/// Download a file from Telegram as a [`Stream`].
|
||||
///
|
||||
/// `path` can be obtained from the [`GetFile`].
|
||||
///
|
||||
/// To download into an [`AsyncWrite`] (e.g. [`tokio::fs::File`]), see
|
||||
/// [`download_file`].
|
||||
///
|
||||
/// [`GetFile`]: crate::payloads::GetFile
|
||||
/// [`AsyncWrite`]: tokio::io::AsyncWrite
|
||||
/// [`tokio::fs::File`]: tokio::fs::File
|
||||
/// [`download_file`]: Self::download_file
|
||||
fn download_file_stream(&self, path: &str) -> Self::Stream;
|
||||
}
|
||||
|
||||
pub async fn download_file_stream(
|
||||
/// Download a file from Telegram into `dst`.
|
||||
///
|
||||
/// Note: if you don't need to use a different (from you're bot) client and
|
||||
/// don't need to get *all* performance (and you don't, c'mon it's very io-bound
|
||||
/// job), then it's recommended to use [`Download::download_file`]
|
||||
pub fn download_file<'o, D>(
|
||||
client: &Client,
|
||||
api_url: Url,
|
||||
token: &str,
|
||||
path: &str,
|
||||
) -> Result<impl futures::Stream<Item = reqwest::Result<bytes::Bytes>>, reqwest::Error> {
|
||||
let res = client
|
||||
.get(crate::net::file_url(
|
||||
reqwest::Url::parse(crate::net::TELEGRAM_API_URL).expect("failed to parse default url"),
|
||||
token,
|
||||
path,
|
||||
))
|
||||
.send()
|
||||
.await?
|
||||
.error_for_status()?;
|
||||
dst: &'o mut D,
|
||||
) -> impl Future<Output = Result<(), DownloadError>> + 'o
|
||||
where
|
||||
D: ?Sized + AsyncWrite + Unpin,
|
||||
{
|
||||
client.get(file_url(api_url, token, path)).send().then(move |r| async move {
|
||||
let mut res = r?.error_for_status()?;
|
||||
|
||||
Ok(futures::stream::unfold(res, |mut res| async {
|
||||
match res.chunk().await {
|
||||
Err(err) => Some((Err(err), res)),
|
||||
Ok(Some(c)) => Some((Ok(c), res)),
|
||||
Ok(None) => None,
|
||||
while let Some(chunk) = res.chunk().await? {
|
||||
dst.write_all(&chunk).await?;
|
||||
}
|
||||
}))
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
/// Download a file from Telegram as a [`Stream`].
|
||||
///
|
||||
/// Note: if you don't need to use a different (from you're bot) client and
|
||||
/// don't need to get *all* performance (and you don't, c'mon it's very io-bound
|
||||
/// job), then it's recommended to use [`Download::download_file_stream`]
|
||||
pub fn download_file_stream(
|
||||
client: &Client,
|
||||
api_url: Url,
|
||||
token: &str,
|
||||
path: &str,
|
||||
) -> impl Stream<Item = reqwest::Result<Bytes>> + 'static {
|
||||
client.get(file_url(api_url, token, path)).send().into_stream().flat_map(|res| {
|
||||
match res.and_then(Response::error_for_status) {
|
||||
Ok(res) => Either::Left(unfold(res, |mut res| async {
|
||||
match res.chunk().await {
|
||||
Err(err) => Some((Err(err), res)),
|
||||
Ok(Some(c)) => Some((Ok(c), res)),
|
||||
Ok(None) => None,
|
||||
}
|
||||
})),
|
||||
Err(err) => Either::Right(once(ready(Err(err)))),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,5 +1,12 @@
|
|||
//! Network specific items.
|
||||
//!
|
||||
//! Currently this module contains only the file download stuff and the default
|
||||
//! api url.
|
||||
// ... and some internal stuff :P
|
||||
|
||||
pub use self::download::{download_file, download_file_stream, Download};
|
||||
|
||||
pub(crate) use self::{
|
||||
download::{download_file, download_file_stream},
|
||||
request::{request_json, request_multipart},
|
||||
telegram_response::TelegramResponse,
|
||||
};
|
||||
|
@ -8,7 +15,8 @@ mod download;
|
|||
mod request;
|
||||
mod telegram_response;
|
||||
|
||||
pub(crate) const TELEGRAM_API_URL: &str = "https://api.telegram.org";
|
||||
/// Default telegram api url
|
||||
pub const TELEGRAM_API_URL: &str = "https://api.telegram.org";
|
||||
|
||||
/// Creates URL for making HTTPS requests. See the [Telegram documentation].
|
||||
///
|
||||
|
|
|
@ -18,7 +18,7 @@ pub struct MultipartRequest<P> {
|
|||
}
|
||||
|
||||
impl<P> MultipartRequest<P> {
|
||||
pub fn new(bot: Bot, payload: P) -> Self {
|
||||
pub const fn new(bot: Bot, payload: P) -> Self {
|
||||
Self { bot, payload }
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,13 +2,15 @@ use serde::{Deserialize, Serialize};
|
|||
|
||||
/// This object represents a file ready to be downloaded.
|
||||
///
|
||||
/// The file can be downloaded via the link `https://api.telegram.org/file/bot<token>/<file_path>`.
|
||||
/// It is guaranteed that the link will be valid for at least 1 hour. When the
|
||||
/// link expires, a new one can be requested by calling [`GetFile`].
|
||||
/// The file can be downloaded via the [`Bot::download_file(file_path, dst)`]
|
||||
/// method. It is guaranteed that the path from [`GetFile`] will be valid for at
|
||||
/// least 1 hour. When the path expires, a new one can be requested by calling
|
||||
/// [`GetFile`].
|
||||
///
|
||||
/// [The official docs](https://core.telegram.org/bots/api#file).
|
||||
///
|
||||
/// [`GetFile`]: crate::payloads::GetFile
|
||||
/// [`Bot::download_file(file_path, dst)`]: crate::net::Download::download_file
|
||||
#[derive(Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)]
|
||||
pub struct File {
|
||||
/// Identifier for this file.
|
||||
|
@ -22,9 +24,10 @@ pub struct File {
|
|||
/// File size, if known.
|
||||
pub file_size: u32,
|
||||
|
||||
// TODO: chacge "Use ..." to use bot.download...
|
||||
/// File path. Use `https://api.telegram.org/file/bot<token>/<file_path>`
|
||||
/// to get the file.
|
||||
/// File path. Use [`Bot::download_file(file_path, dst)`] to get the file.
|
||||
///
|
||||
/// [`Bot::download_file(file_path, dst)`]:
|
||||
/// crate::net::Download::download_file
|
||||
pub file_path: String,
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue