Only set webhook if using SSL and also send certificate

This commit is contained in:
Jannes Höke 2016-03-23 10:11:10 +01:00
parent da193711b1
commit 6db377d56d
2 changed files with 40 additions and 34 deletions

View file

@ -214,7 +214,7 @@ class Updater:
cur_interval = poll_interval
self.logger.debug('Updater thread started')
self._set_webhook(None, bootstrap_retries)
self._set_webhook(None, bootstrap_retries, None)
while self.running:
try:
@ -246,12 +246,13 @@ class Updater:
sleep(cur_interval)
def _set_webhook(self, webhook_url, max_retries):
def _set_webhook(self, webhook_url, max_retries, cert):
retries = 0
while 1:
try:
# Remove webhook
self.bot.setWebhook(webhook_url=webhook_url)
self.bot.setWebhook(webhook_url=webhook_url,
certificate=cert)
except (Unauthorized, InvalidToken):
raise
except TelegramError:
@ -282,43 +283,47 @@ class Updater:
bootstrap_retries, webhook_url):
self.logger.debug('Updater thread started')
use_ssl = cert is not None and key is not None
url_path = "/%s" % url_path
if not url_path.startswith('/'):
url_path = '/' + url_path
# Create and start server
self.httpd = WebhookServer((listen, port), WebhookHandler,
self.update_queue, url_path)
if not webhook_url:
webhook_url = self._gen_webhook_url(listen, port, url_path,
use_ssl)
self._set_webhook(webhook_url, bootstrap_retries)
if use_ssl:
# Check SSL-Certificate with openssl, if possible
try:
exit_code = subprocess.call(["openssl", "x509", "-text",
"-noout", "-in", cert],
stdout=open(os.devnull, 'wb'),
stderr=subprocess.STDOUT)
except OSError:
exit_code = 0
self._check_ssl_cert(cert, key)
if exit_code is 0:
try:
self.httpd.socket = ssl.wrap_socket(self.httpd.socket,
certfile=cert,
keyfile=key,
server_side=True)
except ssl.SSLError as error:
self.logger.exception('failed to init SSL socket')
raise TelegramError(str(error))
else:
raise TelegramError('SSL Certificate invalid')
if not webhook_url:
webhook_url = self._gen_webhook_url(listen, port, url_path)
self._set_webhook(webhook_url, bootstrap_retries,
open(cert, 'rb'))
self.httpd.serve_forever(poll_interval=1)
def _gen_webhook_url(self, listen, port, url_path, use_ssl):
return '{proto}://{listen}:{port}{path}'.format(
proto='https' if use_ssl else 'http',
def _check_ssl_cert(self, cert, key):
# Check SSL-Certificate with openssl, if possible
try:
exit_code = subprocess.call(["openssl", "x509", "-text",
"-noout", "-in", cert],
stdout=open(os.devnull, 'wb'),
stderr=subprocess.STDOUT)
except OSError:
exit_code = 0
if exit_code is 0:
try:
self.httpd.socket = ssl.wrap_socket(self.httpd.socket,
certfile=cert,
keyfile=key,
server_side=True)
except ssl.SSLError as error:
self.logger.exception('Failed to init SSL socket')
raise TelegramError(str(error))
else:
raise TelegramError('SSL Certificate invalid')
def _gen_webhook_url(self, listen, port, url_path):
return 'https://{listen}:{port}{path}'.format(
listen=listen,
port=port,
path=url_path)

View file

@ -509,7 +509,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
retries = 3
self._setup_updater('', messages=0, bootstrap_retries=retries)
self.updater._set_webhook('path', retries)
self.updater._set_webhook('path', retries, None)
self.assertEqual(self.updater.bot.bootstrap_attempts, retries)
def test_bootstrap_retries_unauth(self):
@ -518,7 +518,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
bootstrap_err=Unauthorized())
self.assertRaises(Unauthorized, self.updater._set_webhook, 'path',
retries)
retries, None)
self.assertEqual(self.updater.bot.bootstrap_attempts, 1)
def test_bootstrap_retries_invalid_token(self):
@ -527,7 +527,7 @@ class UpdaterTest(BaseTest, unittest.TestCase):
bootstrap_err=InvalidToken())
self.assertRaises(InvalidToken, self.updater._set_webhook, 'path',
retries)
retries, None)
self.assertEqual(self.updater.bot.bootstrap_attempts, 1)
def test_bootstrap_retries_fail(self):
@ -535,7 +535,8 @@ class UpdaterTest(BaseTest, unittest.TestCase):
self._setup_updater('', messages=0, bootstrap_retries=retries)
self.assertRaisesRegexp(TelegramError, 'test',
self.updater._set_webhook, 'path', retries - 1)
self.updater._set_webhook, 'path', retries - 1,
None)
self.assertEqual(self.updater.bot.bootstrap_attempts, 1)
def test_webhook_invalid_posts(self):