From 6e486f0d78a35688fe7aa437a0f6fe2ab32fa4c8 Mon Sep 17 00:00:00 2001 From: Temirkhan Myrzamadi Date: Mon, 30 Mar 2020 18:41:20 +0600 Subject: [PATCH] Implement webhooks --- Cargo.toml | 2 ++ src/dispatching/update_listeners.rs | 49 +++++++++++++++++++++++++---- src/requests/all/set_webhook.rs | 8 ++--- 3 files changed, 49 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7f4a5158..9a0a8f2d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,8 @@ serde_with_macros = "1.0.1" teloxide-macros = "0.2.1" +warp = "0.2.2" + [dev-dependencies] smart-default = "0.6.0" rand = "0.7.3" diff --git a/src/dispatching/update_listeners.rs b/src/dispatching/update_listeners.rs index f5bdba74..63a4d1a0 100644 --- a/src/dispatching/update_listeners.rs +++ b/src/dispatching/update_listeners.rs @@ -101,14 +101,24 @@ //! [webhook]: https://en.wikipedia.org/wiki/Webhook use futures::{stream, Stream, StreamExt}; +use tokio::sync::mpsc; + +use warp::Filter; use crate::{ bot::Bot, + prelude::ResponseResult, requests::Request, - types::{AllowedUpdate, Update}, + types::{AllowedUpdate, InputFile, Update}, RequestError, }; -use std::{convert::TryInto, sync::Arc, time::Duration}; +use reqwest::Url; +use std::{ + convert::{Infallible, TryInto}, + net::SocketAddr, + sync::Arc, + time::Duration, +}; /// A generic update listener. pub trait UpdateListener: Stream> { @@ -198,7 +208,34 @@ pub fn polling( .flatten() } -// TODO implement webhook (this actually require webserver and probably we -// should add cargo feature that adds webhook) -//pub fn webhook<'a>(bot: &'a cfg: WebhookConfig) -> Updater> + 'a> {} +pub async fn webhook( + bot: Arc, + addr: SocketAddr, + url: Url, + certificate: Option, + max_connections: Option, + allowed_updates: Option>, +) -> ResponseResult> { + let mut webhook = bot.set_webhook(url.to_string()); + webhook.certificate = certificate; + webhook.max_connections = max_connections; + webhook.allowed_updates = allowed_updates; + webhook.send().await?; + + let (tx, rx) = mpsc::unbounded_channel(); + + let server = warp::post().and(warp::path(url)).and(warp::body::json()).map( + move |update: Update| { + tx.send(Ok(update.clone())) + .expect("Cannot send an update from webhook"); + "" + }, + ); + + tokio::spawn(async move { + // TODO: Tls + warp::serve(server).run(addr).await; + }); + + Ok(rx) +} diff --git a/src/requests/all/set_webhook.rs b/src/requests/all/set_webhook.rs index ad293b5c..606beaad 100644 --- a/src/requests/all/set_webhook.rs +++ b/src/requests/all/set_webhook.rs @@ -29,10 +29,10 @@ use std::sync::Arc; pub struct SetWebhook { #[serde(skip_serializing)] bot: Arc, - url: String, - certificate: Option, - max_connections: Option, - allowed_updates: Option>, + pub(crate) url: String, + pub(crate) certificate: Option, + pub(crate) max_connections: Option, + pub(crate) allowed_updates: Option>, } #[async_trait::async_trait]